Skip to content

feat: Sessions - bidirectional durable agent streams#3417

Open
ericallam wants to merge 6 commits intomainfrom
feature/tri-8627-session-primitive-server-side-schema-routes-clickhouse
Open

feat: Sessions - bidirectional durable agent streams#3417
ericallam wants to merge 6 commits intomainfrom
feature/tri-8627-session-primitive-server-side-schema-routes-clickhouse

Conversation

@ericallam
Copy link
Copy Markdown
Member

@ericallam ericallam commented Apr 20, 2026

What this enables

A new first-class primitive, Session, for durable bidirectional I/O that outlives a single run. Sessions give you a server-managed channel pair (.out from the task, .in from the client) that you can write to, read from, and subscribe to across many runs, filter, list, and close, all through a single identifier.

Use cases unblocked

  • Chat agents that persist across turns. Turns 1..N attach to the same Session. The UI subscribes once and keeps receiving output as new runs attach.
  • Approval loops and long-running tasks with user feedback. The task waits on .in, the client writes to .in, and the server enforces no-writes-after-close.
  • Workflow progress streams that live past the run. A dashboard can subscribe to .out after the task finishes to replay the history.
  • Any session-scoped state where pre-existing run streams (scoped to a single run) were too narrow.

Public API surface

Control plane

  • POST /api/v1/sessions to create. Idempotent when you supply externalId.
  • GET /api/v1/sessions/:session to retrieve by friendlyId (session_abc) or by your own externalId. The server disambiguates via the session_ prefix.
  • GET /api/v1/sessions to list with filters (type, tag, taskIdentifier, externalId, derived status = ACTIVE/CLOSED/EXPIRED, created-at period/from/to) and cursor pagination. Backed by ClickHouse.
  • PATCH /api/v1/sessions/:session to update tags/metadata/externalId.
  • POST /api/v1/sessions/:session/close to terminate. Idempotent, hard-blocks new server-brokered writes.

Realtime

  • PUT /realtime/v1/sessions/:session/:io to initialize a channel. Returns S2 credentials in headers so clients can write direct to S2 for high-throughput cases.
  • GET /realtime/v1/sessions/:session/:io for SSE subscribe.
  • POST /realtime/v1/sessions/:session/:io/append for server-side appends.

Scopes

  • sessions is now a ResourceType. read:sessions:{id}, write:sessions:{id}, admin:sessions:{id} all flow through the existing JWT validator.

Implementation summary

Postgres (Session table)

  • Scalar scoping columns (projectId, runtimeEnvironmentId, environmentType, organizationId) with no foreign keys. Matches the January TaskRun FK-removal decision, keeps the write path partition-friendly.
  • Point-lookup indexes only: friendlyId unique, (env, externalId) unique, expiresAt. List queries are served from ClickHouse, so Postgres stays insert-heavy.
  • Terminal markers (closedAt, closedReason, expiresAt) are write-once. No status enum, no counters, no currentRunId pointer. All run-related state is derived.

ClickHouse (sessions_v1)

  • ReplacingMergeTree partitioned by month, ordered by (org_id, project_id, environment_id, created_at, session_id). tags indexed with a tokenbf_v1 skip index.
  • SessionsReplicationService mirrors RunsReplicationService exactly: logical replication with leader-locked consumer, ConcurrentFlushScheduler, retry with exponential backoff + jitter, identical metric shape. Dedicated slot + publication so the two consume independently.
  • SessionsRepository + ClickHouseSessionsRepository expose list / count / tags with the same cursor pagination convention as runs and waitpoints.

S2

  • New key format for session channels: sessions/{friendlyId}/{out|in}. The existing runs/{runId}/{streamId} format for implicit run streams is completely untouched.

What did not change

  • Run-scoped streams.pipe / streams.input still behave exactly as before. They do not create Session rows and the existing routes are unchanged. Sessions are a net-new primitive for the next phase of agent features, not a reshaping of the current streams API.

Verification

  • Webapp typecheck clean (10/10).
  • apps/webapp/test/sessionsReplicationService.test.ts exercises insert and update round-trips through Postgres logical replication into ClickHouse via testcontainers.
  • Live end-to-end against local dev: create, retrieve (friendlyId + externalId), update, .out.initialize, .out.append x2, .in.send, .out.subscribe over SSE, list (type, tag, status, externalId, pagination), close, idempotent re-close. Replicated row lands in ClickHouse within ~1s with closed_reason intact.

Not in this PR

  • Client SDK (lives on the ai-chat feature branch, wires up the runtime ergonomics for chat.agent).
  • Dashboard routes.
  • chat.agent integration.

Test plan

  • pnpm run typecheck --filter webapp
  • pnpm run test --filter webapp ./test/sessionsReplicationService.test.ts --run
  • Start the webapp with SESSION_REPLICATION_CLICKHOUSE_URL and SESSION_REPLICATION_ENABLED=1 set. Confirm the slot and publication auto-create on boot.
  • Hit POST /api/v1/sessions and verify the row replicates to trigger_dev.sessions_v1 within a couple of seconds.
  • POST /api/v1/sessions/:id/close and confirm subsequent POST /realtime/v1/sessions/:id/out/append returns 400.

@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented Apr 20, 2026

🦋 Changeset detected

Latest commit: 4cadc19

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 29 packages
Name Type
@trigger.dev/core Patch
@trigger.dev/build Patch
trigger.dev Patch
@trigger.dev/python Patch
@trigger.dev/redis-worker Patch
@trigger.dev/schema-to-json Patch
@trigger.dev/sdk Patch
@internal/cache Patch
@internal/clickhouse Patch
@internal/llm-model-catalog Patch
@internal/redis Patch
@internal/replication Patch
@internal/run-engine Patch
@internal/schedule-engine Patch
@internal/testcontainers Patch
@internal/tracing Patch
@internal/tsql Patch
@internal/zod-worker Patch
d3-chat Patch
references-d3-openai-agents Patch
references-nextjs-realtime Patch
references-realtime-hooks-test Patch
references-realtime-streams Patch
references-telemetry Patch
@internal/sdk-compat-tests Patch
@trigger.dev/react-hooks Patch
@trigger.dev/rsc Patch
@trigger.dev/database Patch
@trigger.dev/otlp-importer Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 20, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Introduces a durable Session primitive end-to-end: a new Prisma Session model and migration, a ClickHouse sessions_v1 table and query/insert helpers, ClickHouse-backed SessionsRepository, a SessionsReplicationService that streams Postgres logical replication into ClickHouse (with retry/ack/flush/leader-lock logic), session-friendly ID export (SessionId) and API Zod schemas, multiple REST and realtime routes for session CRUD, streaming and append, session-stream waitpoint support with Redis-backed pending sets, environment config and startup wiring, helper utilities, and end-to-end replication tests.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 35.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: Sessions - bidirectional durable agent streams' clearly summarizes the main change, specifying the new Sessions feature with its core capability of bidirectional streaming.
Description check ✅ Passed The PR description comprehensively covers the feature scope, use cases, public API surface, implementation details, and verification steps. However, the description lacks the required checklist items from the template (contributing guide, PR title convention, testing confirmation) and does not follow the template structure with explicit Changelog and Screenshots sections.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/tri-8627-session-primitive-server-side-schema-routes-clickhouse

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

coderabbitai[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

Durable, typed, bidirectional I/O primitive that outlives a single run.
Ship target is agent/chat use cases; run-scoped streams.pipe/streams.input
are untouched and do not create Session rows.

Postgres
- New Session table: id, friendlyId, externalId, type (plain string),
  denormalised project/environment/organization scalar columns (no FKs),
  taskIdentifier, tags String[], metadata Json, closedAt, closedReason,
  expiresAt, timestamps
- Point-lookup indexes only (friendlyId unique, (env, externalId) unique,
  expiresAt). List queries are served from ClickHouse so Postgres stays
  minimal and insert-heavy.

Control-plane API
- POST   /api/v1/sessions           create (idempotent via externalId)
- GET    /api/v1/sessions           list with filters (type, tag,
                                     taskIdentifier, externalId, status
                                     ACTIVE|CLOSED|EXPIRED, period/from/to)
                                     and cursor pagination, ClickHouse-backed
- GET    /api/v1/sessions/:session  retrieve — polymorphic: `session_` prefix
                                     hits friendlyId, otherwise externalId
- PATCH  /api/v1/sessions/:session  update tags/metadata/externalId
- POST   /api/v1/sessions/:session/close  terminal close (idempotent)

Realtime (S2-backed)
- PUT    /realtime/v1/sessions/:session/:io           returns S2 creds
- GET    /realtime/v1/sessions/:session/:io           SSE subscribe
- POST   /realtime/v1/sessions/:session/:io/append    server-side append
- S2 key format: sessions/{friendlyId}/{out|in}

Auth
- sessions added to ResourceTypes. read:sessions:{id},
  write:sessions:{id}, admin:sessions:{id} scopes work via existing JWT
  validation.

ClickHouse
- sessions_v1 ReplacingMergeTree table
- SessionsReplicationService mirrors RunsReplicationService exactly:
  logical replication with leader-locked consumer, ConcurrentFlushScheduler,
  retry with exponential backoff + jitter, identical metric shape.
  Dedicated slot + publication (sessions_to_clickhouse_v1[_publication]).
- SessionsRepository + ClickHouseSessionsRepository expose list, count,
  tags with cursor pagination keyed by (created_at DESC, session_id DESC).
- Derived status (ACTIVE/CLOSED/EXPIRED) computed from closed_at + expires_at;
  in-memory fallback on list results to catch pre-replication writes.

Verification
- Webapp typecheck 10/10
- Core + SDK build 3/3
- sessionsReplicationService.test.ts integration tests 2/2 (insert + update
  round-trip via testcontainers)
- Live round-trip against local dev: create -> retrieve (friendlyId and
  externalId) -> out.initialize -> out.append x2 -> in.send -> out.subscribe
  (receives records) -> close -> ClickHouse sessions_v1 shows the replicated
  row with closed_reason
- Live list smoke: tag, type, status CLOSED, externalId, and cursor pagination
…te/update

The session_ prefix identifies internal friendlyIds. Allowing it in a
user-supplied externalId would misroute subsequent GET/PATCH/close
requests through resolveSessionByIdOrExternalId to a friendlyId lookup,
returning null or the wrong session. Reject at the schema boundary so
both routes surface a clean 422.
Without allowJWT/corsStrategy, frontend clients holding public access
tokens hit 401 on GET /api/v1/sessions and browser preflights fail.
Matches the single-session GET/PATCH/close routes and the runs list
endpoint.
- Derive isCached from the upsert result (id mismatch = pre-existing row)
  instead of doing a separate findFirst first. The pre-check was racy —
  two concurrent first-time POSTs could both return 201 with
  isCached: false. Using the returned row's id is atomic and saves a
  round-trip.

- Scope the list endpoint's authorization to the standard action/resource
  pattern (matches api.v1.runs.ts): task-scoped JWTs can list sessions
  filtered by their task, and broader super-scopes (read:sessions,
  read:all, admin) authorize unfiltered listing.

- Log and swallow unexpected errors on POST rather than returning the
  raw error.message. Prisma/internal messages can leak column names and
  query fragments.
Give Session channels run-engine waitpoint semantics so a task can
suspend while idle on a session channel and resume when an external
client sends a record — parallel to what streams.input offers
run-scoped streams.

Webapp
- POST /api/v1/runs/:runFriendlyId/session-streams/wait — creates a
  manual waitpoint attached to {sessionId, io} and race-checks the S2
  stream starting at lastSeqNum so pre-arrived data fires it
  immediately. Mirrors the existing input-stream waitpoint route.
- sessionStreamWaitpointCache.server.ts — Redis set keyed on
  {sessionFriendlyId, io}, drained atomically on each append so
  concurrent multi-tab waiters all wake together.
- realtime.v1.sessions.$session.$io.append now drains pending
  waitpoints after every record lands and completes each with the
  appended body.
- S2RealtimeStreams.readSessionStreamRecords — session-channel
  parallel of readRecords, feeds the race-check path.

Core
- CreateSessionStreamWaitpoint request/response schemas alongside
  the existing Session CRUD schemas. Server API contract only —
  the client ApiClient + SDK wrapper ship on the AI-chat branch.
@ericallam ericallam force-pushed the feature/tri-8627-session-primitive-server-side-schema-routes-clickhouse branch from 2210fe2 to 4cadc19 Compare April 23, 2026 09:10
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 11 additional findings in Devin Review.

Open in Devin Review

Comment on lines +52 to +53
await redis.sadd(key, waitpointId);
await redis.pexpire(key, ttlMs ?? DEFAULT_TTL_MS);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 pexpire on Redis set resets TTL for all waitpoints, causing premature eviction of longer-lived registrations

When multiple waitpoints with different timeouts are registered on the same (sessionFriendlyId, io) Redis set, each call to addSessionStreamWaitpoint invokes pexpire on the shared key, resetting the TTL for the entire set to the latest caller's value. If waitpoint A is registered with a 1-hour timeout, and waitpoint B is later registered on the same channel with a 5-minute timeout, the pexpire at line 53 overwrites the key TTL to 5 minutes. After 5 minutes the key expires, removing waitpoint A's registration from Redis. When data subsequently arrives on the channel (between 5 min and 1 hour), the append handler's drainSessionStreamWaitpoints call returns an empty set and waitpoint A is never completed — it will only time out via the engine's internal timeout mechanism instead of being fulfilled with the actual data.

The existing inputStreamWaitpointCache.server.ts avoids this because it stores a single string value per key (one waitpoint per (run, stream) pair), whereas sessions intentionally support multiple concurrent waiters via a Redis set.

Prompt for agents
In addSessionStreamWaitpoint (sessionStreamWaitpointCache.server.ts), the pexpire call at line 53 resets the TTL for the entire Redis set to the latest waitpoint's value. When multiple waitpoints with different timeouts coexist on the same (sessionFriendlyId, io) key, a shorter-TTL waitpoint added later will prematurely expire the key, evicting longer-lived waitpoint registrations.

Possible approaches:
1. Use the maximum of the current key TTL and the new TTL: read the current PTTL of the key, and only call pexpire if the new TTL is greater. This can be done atomically with a Lua script.
2. Store each waitpoint as a separate Redis key with its own TTL (e.g. ssw:{sessionId}:{io}:{waitpointId}), and use a SCAN/pattern match in drainSessionStreamWaitpoints. This trades atomicity of the drain for per-waitpoint TTL correctness.
3. Always use DEFAULT_TTL_MS for the key expiry (since it is already 7 days) and let the engine's internal timeout mechanism handle waitpoint-level expiry. Only use the custom ttlMs if it is larger than the current TTL.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

♻️ Duplicate comments (2)
apps/webapp/app/routes/api.v1.sessions.$session.ts (1)

30-33: ⚠️ Potential issue | 🟡 Minor

Filter out missing externalId before authorization.

Line 32 passes "" as a session identifier when externalId is null. Build the resource list conditionally so an empty string cannot match a malformed or permissive scope.

Proposed fix
-      resource: (session) => ({ sessions: [session.friendlyId, session.externalId ?? ""] }),
+      resource: (session) => ({
+        sessions: session.externalId
+          ? [session.friendlyId, session.externalId]
+          : [session.friendlyId],
+      }),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/routes/api.v1.sessions`.$session.ts around lines 30 - 33, The
authorization resource builder currently includes session.externalId ?? "" which
can add an empty string as an identifier; update the resource function used in
the authorization object (resource: (session) => ...) to only include
session.externalId when it is non-null/defined (e.g., build an array starting
with session.friendlyId and push session.externalId only if truthy) so the
resulting sessions list contains no empty-string entries and cannot match
malformed/permissive scopes.
apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts (1)

77-85: ⚠️ Potential issue | 🟠 Major

Return a generic 500 for append failures.

Line 84 surfaces appendError.message for non-validation failures. Log the internal error server-side and return a stable generic message to avoid leaking S2/client/internal details.

Proposed fix
       if (appendError instanceof ServiceValidationError) {
         return json(
           { ok: false, error: appendError.message },
           { status: appendError.status ?? 422 }
         );
       }
-      return json({ ok: false, error: appendError.message }, { status: 500 });
+      logger.error("Failed to append to session stream", {
+        sessionFriendlyId: session.friendlyId,
+        io: params.io,
+        error: appendError,
+      });
+      return json({ ok: false, error: "Something went wrong" }, { status: 500 });
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/routes/realtime.v1.sessions`.$session.$io.append.ts around
lines 77 - 85, The current error handling block returns appendError.message for
non-validation errors; change it so that when appendError is not an instance of
ServiceValidationError you log the full error server-side (e.g., using the
existing logger or console.error) and return a stable generic JSON error message
via json({ ok: false, error: "Internal server error" }, { status: 500 }) instead
of exposing appendError.message; keep the ServiceValidationError branch
unchanged and reference the same symbols appendError, ServiceValidationError,
and json to locate and update the logic.
🧹 Nitpick comments (3)
apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts (1)

76-84: Recommended: parallelize createWaitpointTag calls.

The sequential await in the loop serializes up to MAX_TAGS_PER_WAITPOINT round-trips on the hot path. A Promise.all over bodyTags.map(...) is a trivial change with meaningful latency savings and no correctness impact (each createWaitpointTag is scoped by env+project+tag).

♻️ Suggested fix
-      if (bodyTags && bodyTags.length > 0) {
-        for (const tag of bodyTags) {
-          await createWaitpointTag({
-            tag,
-            environmentId: authentication.environment.id,
-            projectId: authentication.environment.projectId,
-          });
-        }
-      }
+      if (bodyTags && bodyTags.length > 0) {
+        await Promise.all(
+          bodyTags.map((tag) =>
+            createWaitpointTag({
+              tag,
+              environmentId: authentication.environment.id,
+              projectId: authentication.environment.projectId,
+            })
+          )
+        );
+      }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/routes/api.v1.runs`.$runFriendlyId.session-streams.wait.ts
around lines 76 - 84, The loop is awaiting createWaitpointTag sequentially which
serializes network calls; replace the for-await pattern with a parallelized call
like Promise.all(bodyTags.map(tag => createWaitpointTag({ tag, environmentId:
authentication.environment.id, projectId: authentication.environment.projectId
}))) so all createWaitpointTag calls run concurrently (guard as before with if
(bodyTags && bodyTags.length > 0)); keep the same payload shape and await the
Promise.all result to preserve error propagation.
apps/webapp/test/sessionsReplicationService.test.ts (1)

1-205: Move this test beside the service under test.

This new suite targets ~/services/sessionsReplicationService.server, but it lives in apps/webapp/test/. Please place it next to the service file using the repository’s adjacent test naming convention.

As per coding guidelines, “Test files should live beside the files under test and use descriptive describe and it blocks” and “Place test files next to source files with naming pattern MyService.ts -> MyService.test.ts.”

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/test/sessionsReplicationService.test.ts` around lines 1 - 205,
The test suite for SessionsReplicationService is in the wrong place; move this
test file so it sits adjacent to the SessionsReplicationService implementation
(the module that exports SessionsReplicationService /
sessionsReplicationService.server) and rename it to follow the adjacent naming
convention (e.g., SessionsReplicationService.test.ts). Update all imports that
use the app-root alias (~/services/sessionsReplicationService.server) to
relative imports pointing to the service file, and ensure any test helpers like
containerTest, ClickHouse, and prisma imports resolve correctly from the new
location; keep the describe/it semantics but ensure the file name matches the
service-to-test mapping.
packages/core/src/v3/schemas/api.ts (1)

1502-1502: Nit: reject empty taskIdentifier.

z.string().max(128).optional() accepts "", which will then be stored as-is and confuse downstream lookups/filters that treat absence vs. empty differently. Align with externalId by trimming and requiring at least one character.

-  taskIdentifier: z.string().max(128).optional(),
+  taskIdentifier: z.string().trim().min(1).max(128).optional(),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/core/src/v3/schemas/api.ts` at line 1502, The taskIdentifier schema
currently allows empty strings; update the validator for taskIdentifier (in the
API schema where taskIdentifier: z.string().max(128).optional()) to reject empty
values by trimming and requiring at least one character—e.g., use
z.string().trim().min(1).max(128).optional()—so it aligns with externalId and
avoids storing "" versus absent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/webapp/app/entry.server.tsx`:
- Around line 45-52: The SIGTERM/SIGINT handlers currently bind the async method
sessionsReplicationInstance.shutdown directly, which can produce unhandled
promise rejections; change the handlers registered via
signalsEmitter.on("SIGTERM", ...) and signalsEmitter.on("SIGINT", ...) to call
an async wrapper that invokes sessionsReplicationInstance.shutdown() and
attaches a .catch(...) to log or swallow errors (mirror the pattern used in
dynamicFlushScheduler.server for safe async signal handling), ensuring any
rejection from _replicationClient.stop() is handled.

In `@apps/webapp/app/routes/api.v1.runs`.$runFriendlyId.session-streams.wait.ts:
- Around line 143-161: The empty catch after the race-check should not swallow
errors — replace it with a logger.warn/error that includes the context
(sessionFriendlyId, io, and WaitpointId.toFriendlyId(result.waitpoint.id) or
waitpointId variable) and the caught error so ops can distinguish cached-drain
vs broken race-checks; and in the outer catch branch that currently does `return
json({ error: error.message }, { status: 500 })` do not return raw internal
messages — log the error (with sessionFriendlyId/io/waitpointId/context) and
return a generic 500 response like `{ error: "Something went wrong" }` while
preserving the existing 422 handling for ServiceValidationError.

In `@apps/webapp/app/routes/api.v1.sessions`.$session.close.ts:
- Around line 45-55: The close endpoint currently does a read-then-update which
has a TOCTOU race: when updating the session in the block that calls
prisma.session.update (using existing.id and setting closedAt/closedReason),
change the update to be conditional by adding closedAt: null to the where clause
so the DB only writes if not already closed; after the conditional update, if no
row was affected (update returned null or zero), re-query the session (the
existing/serializeSession path) and return that value to ensure idempotent,
write-once behavior for closedAt/closedReason.

In `@apps/webapp/app/routes/api.v1.sessions.ts`:
- Around line 88-93: The create session POST route uses createActionApiRoute but
is missing the auth/CORS flags; update the options object passed to
createActionApiRoute for the create action (the one with body:
CreateSessionRequestBody and method: "POST") to include allowJWT: true and
corsStrategy: "all" so that JWT-scoped clients and cross-origin requests are
permitted for the create endpoint.

In `@apps/webapp/app/routes/realtime.v1.sessions`.$session.$io.ts:
- Around line 21-35: The route options passed into createActionApiRoute should
include method: "PUT" so framework-level HTTP method validation runs before the
handler; update the options object (the first argument to createActionApiRoute
that currently lists params: ParamsSchema, allowJWT, corsStrategy,
authorization) to add method: "PUT" (referencing createActionApiRoute and
ParamsSchema to locate the block) so the PUT-only enforcement happens earlier in
the request lifecycle.

In `@apps/webapp/app/services/sessionsRepository/sessionsRepository.server.ts`:
- Around line 101-113: Replace the exported interface ISessionsRepository with
an exported type alias that preserves the exact shape (name: string and the same
method signatures for listSessionIds, listSessions, countSessions, and listTags)
so callers and implementations (e.g., any classes or functions referencing
ISessionsRepository) keep the same contract; update the declaration of
ISessionsRepository to use the TypeScript "type" keyword and ensure the
referenced types ListSessionsOptions, ListedSession, SessionListInputOptions,
SessionTagListOptions, and SessionTagList remain unchanged.

In `@apps/webapp/app/services/sessionStreamWaitpointCache.server.ts`:
- Around line 50-53: Replace separate SADD and PEXPIRE calls with a Redis
transaction so both operations succeed or fail together: use redis.multi() (or
pipeline()) to queue redis.sadd(buildKey(sessionFriendlyId, io), waitpointId)
and redis.pexpire(buildKey(sessionFriendlyId, io), ttlMs ?? DEFAULT_TTL_MS) and
then await exec()/execAsync(), check the exec result for errors, and handle
failures the same way you would for the try/catch around the current redis calls
so the key cannot be left without a TTL.

In `@packages/core/src/v3/schemas/api.ts`:
- Around line 1556-1559: Rename the CloseSessionRequestBody schema field from
reason to closedReason to match the route mapping and response shape: update the
z.object in CloseSessionRequestBody to use closedReason:
z.string().max(256).optional(), keep the exported type CloseSessionRequestBody =
z.infer<typeof CloseSessionRequestBody> unchanged, and run a quick grep for
CloseSessionRequestBody usage to adjust any callers expecting the old reason
property.

---

Duplicate comments:
In `@apps/webapp/app/routes/api.v1.sessions`.$session.ts:
- Around line 30-33: The authorization resource builder currently includes
session.externalId ?? "" which can add an empty string as an identifier; update
the resource function used in the authorization object (resource: (session) =>
...) to only include session.externalId when it is non-null/defined (e.g., build
an array starting with session.friendlyId and push session.externalId only if
truthy) so the resulting sessions list contains no empty-string entries and
cannot match malformed/permissive scopes.

In `@apps/webapp/app/routes/realtime.v1.sessions`.$session.$io.append.ts:
- Around line 77-85: The current error handling block returns
appendError.message for non-validation errors; change it so that when
appendError is not an instance of ServiceValidationError you log the full error
server-side (e.g., using the existing logger or console.error) and return a
stable generic JSON error message via json({ ok: false, error: "Internal server
error" }, { status: 500 }) instead of exposing appendError.message; keep the
ServiceValidationError branch unchanged and reference the same symbols
appendError, ServiceValidationError, and json to locate and update the logic.

---

Nitpick comments:
In `@apps/webapp/app/routes/api.v1.runs`.$runFriendlyId.session-streams.wait.ts:
- Around line 76-84: The loop is awaiting createWaitpointTag sequentially which
serializes network calls; replace the for-await pattern with a parallelized call
like Promise.all(bodyTags.map(tag => createWaitpointTag({ tag, environmentId:
authentication.environment.id, projectId: authentication.environment.projectId
}))) so all createWaitpointTag calls run concurrently (guard as before with if
(bodyTags && bodyTags.length > 0)); keep the same payload shape and await the
Promise.all result to preserve error propagation.

In `@apps/webapp/test/sessionsReplicationService.test.ts`:
- Around line 1-205: The test suite for SessionsReplicationService is in the
wrong place; move this test file so it sits adjacent to the
SessionsReplicationService implementation (the module that exports
SessionsReplicationService / sessionsReplicationService.server) and rename it to
follow the adjacent naming convention (e.g.,
SessionsReplicationService.test.ts). Update all imports that use the app-root
alias (~/services/sessionsReplicationService.server) to relative imports
pointing to the service file, and ensure any test helpers like containerTest,
ClickHouse, and prisma imports resolve correctly from the new location; keep the
describe/it semantics but ensure the file name matches the service-to-test
mapping.

In `@packages/core/src/v3/schemas/api.ts`:
- Line 1502: The taskIdentifier schema currently allows empty strings; update
the validator for taskIdentifier (in the API schema where taskIdentifier:
z.string().max(128).optional()) to reject empty values by trimming and requiring
at least one character—e.g., use z.string().trim().min(1).max(128).optional()—so
it aligns with externalId and avoids storing "" versus absent.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: bfe7b4b3-0bee-49c6-893c-6b4cc70e98af

📥 Commits

Reviewing files that changed from the base of the PR and between 2210fe2 and 4cadc19.

📒 Files selected for processing (28)
  • .changeset/session-primitive.md
  • .server-changes/session-primitive.md
  • apps/webapp/app/entry.server.tsx
  • apps/webapp/app/env.server.ts
  • apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts
  • apps/webapp/app/routes/api.v1.sessions.$session.close.ts
  • apps/webapp/app/routes/api.v1.sessions.$session.ts
  • apps/webapp/app/routes/api.v1.sessions.ts
  • apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts
  • apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts
  • apps/webapp/app/services/authorization.server.ts
  • apps/webapp/app/services/realtime/s2realtimeStreams.server.ts
  • apps/webapp/app/services/realtime/sessions.server.ts
  • apps/webapp/app/services/sessionStreamWaitpointCache.server.ts
  • apps/webapp/app/services/sessionsReplicationInstance.server.ts
  • apps/webapp/app/services/sessionsReplicationService.server.ts
  • apps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.ts
  • apps/webapp/app/services/sessionsRepository/sessionsRepository.server.ts
  • apps/webapp/app/v3/services/adminWorker.server.ts
  • apps/webapp/test/sessionsReplicationService.test.ts
  • internal-packages/clickhouse/schema/030_create_sessions_v1.sql
  • internal-packages/clickhouse/src/index.ts
  • internal-packages/clickhouse/src/sessions.ts
  • internal-packages/database/prisma/migrations/20260419000000_add_sessions_table/migration.sql
  • internal-packages/database/prisma/schema.prisma
  • packages/core/src/v3/isomorphic/friendlyId.ts
  • packages/core/src/v3/schemas/api.ts
✅ Files skipped from review due to trivial changes (6)
  • packages/core/src/v3/isomorphic/friendlyId.ts
  • apps/webapp/app/services/authorization.server.ts
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts
  • apps/webapp/app/services/sessionsReplicationInstance.server.ts
  • internal-packages/clickhouse/schema/030_create_sessions_v1.sql
  • apps/webapp/app/services/sessionsReplicationService.server.ts
🚧 Files skipped from review as they are similar to previous changes (6)
  • apps/webapp/app/v3/services/adminWorker.server.ts
  • .changeset/session-primitive.md
  • internal-packages/database/prisma/migrations/20260419000000_add_sessions_table/migration.sql
  • internal-packages/clickhouse/src/index.ts
  • apps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.ts
  • apps/webapp/app/services/realtime/s2realtimeStreams.server.ts

Comment on lines +45 to +52
signalsEmitter.on(
"SIGTERM",
sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
);
signalsEmitter.on(
"SIGINT",
sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Wrap async shutdown signal handlers with .catch to avoid unhandled rejections.

sessionsReplicationInstance.shutdown is async and can reject (e.g., _replicationClient.stop() errors). Without a .catch, a rejection during SIGTERM/SIGINT becomes an unhandled promise rejection while the process is already tearing down. This is the pattern used elsewhere in the codebase (see apps/webapp/app/v3/dynamicFlushScheduler.server.ts line 145-160).

♻️ Proposed fix
-  signalsEmitter.on(
-    "SIGTERM",
-    sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
-  );
-  signalsEmitter.on(
-    "SIGINT",
-    sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
-  );
+  const shutdownSessionsReplication = () =>
+    sessionsReplicationInstance.shutdown().catch((error) => {
+      console.error("🗃️ Sessions replication service shutdown error", { error });
+    });
+  signalsEmitter.on("SIGTERM", shutdownSessionsReplication);
+  signalsEmitter.on("SIGINT", shutdownSessionsReplication);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/entry.server.tsx` around lines 45 - 52, The SIGTERM/SIGINT
handlers currently bind the async method sessionsReplicationInstance.shutdown
directly, which can produce unhandled promise rejections; change the handlers
registered via signalsEmitter.on("SIGTERM", ...) and signalsEmitter.on("SIGINT",
...) to call an async wrapper that invokes
sessionsReplicationInstance.shutdown() and attaches a .catch(...) to log or
swallow errors (mirror the pattern used in dynamicFlushScheduler.server for safe
async signal handling), ensuring any rejection from _replicationClient.stop() is
handled.

Comment on lines +143 to +161
} catch {
// Non-fatal: pending registration stays in Redis; the next append
// will complete the waitpoint via the append handler path.
}
}

return json<CreateSessionStreamWaitpointResponseBody>({
waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id),
isCached: result.isCached,
});
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
return json({ error: error.message }, { status: 500 });
}

return json({ error: "Something went wrong" }, { status: 500 });
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Consider logging the swallowed race-check error and not leaking raw error messages.

Two related concerns in the error-handling tail:

  1. Line 143-146: the catch {} silently drops any failure from the race-check (S2 read / engine complete / cache remove). If this path has latent bugs you'll only notice via timeouts, since nothing is recorded. Please at least logger.warn/logger.error with sessionFriendlyId, io, and waitpointId so ops can tell cached-drain-on-append from broken race-checks.
  2. Line 156-158: return json({ error: error.message }, { status: 500 }) forwards arbitrary internal error messages (e.g., Prisma, engine, S2) to the client. Per the PR commit message "log and swallow unexpected internal errors instead of returning raw Prisma/internal messages that could leak details", this route should follow the same pattern — log the error server-side and return a generic 500 message.
♻️ Suggested fix
-        } catch {
-          // Non-fatal: pending registration stays in Redis; the next append
-          // will complete the waitpoint via the append handler path.
-        }
+        } catch (error) {
+          // Non-fatal: pending registration stays in Redis; the next append
+          // will complete the waitpoint via the append handler path.
+          logger.warn("session-stream wait race-check failed", {
+            sessionFriendlyId: session.friendlyId,
+            io: body.io,
+            waitpointId: result.waitpoint.id,
+            error,
+          });
+        }
...
-    } catch (error) {
-      if (error instanceof ServiceValidationError) {
-        return json({ error: error.message }, { status: 422 });
-      } else if (error instanceof Error) {
-        return json({ error: error.message }, { status: 500 });
-      }
-
-      return json({ error: "Something went wrong" }, { status: 500 });
-    }
+    } catch (error) {
+      if (error instanceof ServiceValidationError) {
+        return json({ error: error.message }, { status: 422 });
+      }
+      logger.error("Failed to create session-stream waitpoint", { error });
+      return json({ error: "Something went wrong" }, { status: 500 });
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/routes/api.v1.runs`.$runFriendlyId.session-streams.wait.ts
around lines 143 - 161, The empty catch after the race-check should not swallow
errors — replace it with a logger.warn/error that includes the context
(sessionFriendlyId, io, and WaitpointId.toFriendlyId(result.waitpoint.id) or
waitpointId variable) and the caught error so ops can distinguish cached-drain
vs broken race-checks; and in the outer catch branch that currently does `return
json({ error: error.message }, { status: 500 })` do not return raw internal
messages — log the error (with sessionFriendlyId/io/waitpointId/context) and
return a generic 500 response like `{ error: "Something went wrong" }` while
preserving the existing 422 handling for ServiceValidationError.

Comment on lines +45 to +55
if (existing.closedAt) {
return json<RetrieveSessionResponseBody>(serializeSession(existing));
}

const updated = await prisma.session.update({
where: { id: existing.id },
data: {
closedAt: new Date(),
closedReason: body.reason ?? null,
},
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor: idempotent close has a TOCTOU — concurrent closes can overwrite closedAt/closedReason.

Two concurrent POST /close calls can both observe existing.closedAt == null and both run the update, so the second overwrites the first's closedAt timestamp and closedReason. Given the PR description states terminal markers are "write-once, never flipped back", you can enforce that at the DB level by scoping the update with closedAt: null and falling back to re-reading when the update affects zero rows.

♻️ Suggested fix
-    const updated = await prisma.session.update({
-      where: { id: existing.id },
-      data: {
-        closedAt: new Date(),
-        closedReason: body.reason ?? null,
-      },
-    });
-
-    return json<RetrieveSessionResponseBody>(serializeSession(updated));
+    const { count } = await prisma.session.updateMany({
+      where: { id: existing.id, closedAt: null },
+      data: {
+        closedAt: new Date(),
+        closedReason: body.reason ?? null,
+      },
+    });
+
+    const final =
+      count === 0
+        ? await prisma.session.findFirst({ where: { id: existing.id } })
+        : await prisma.session.findFirst({ where: { id: existing.id } });
+
+    if (!final) {
+      return json({ error: "Session not found" }, { status: 404 });
+    }
+
+    return json<RetrieveSessionResponseBody>(serializeSession(final));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (existing.closedAt) {
return json<RetrieveSessionResponseBody>(serializeSession(existing));
}
const updated = await prisma.session.update({
where: { id: existing.id },
data: {
closedAt: new Date(),
closedReason: body.reason ?? null,
},
});
if (existing.closedAt) {
return json<RetrieveSessionResponseBody>(serializeSession(existing));
}
const { count } = await prisma.session.updateMany({
where: { id: existing.id, closedAt: null },
data: {
closedAt: new Date(),
closedReason: body.reason ?? null,
},
});
const final = await prisma.session.findFirst({ where: { id: existing.id } });
if (!final) {
return json({ error: "Session not found" }, { status: 404 });
}
return json<RetrieveSessionResponseBody>(serializeSession(final));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/routes/api.v1.sessions`.$session.close.ts around lines 45 -
55, The close endpoint currently does a read-then-update which has a TOCTOU
race: when updating the session in the block that calls prisma.session.update
(using existing.id and setting closedAt/closedReason), change the update to be
conditional by adding closedAt: null to the where clause so the DB only writes
if not already closed; after the conditional update, if no row was affected
(update returned null or zero), re-query the session (the
existing/serializeSession path) and return that value to ensure idempotent,
write-once behavior for closedAt/closedReason.

Comment on lines +88 to +93
const { action } = createActionApiRoute(
{
body: CreateSessionRequestBody,
method: "POST",
maxContentLength: 1024 * 32, // 32KB — metadata is the only thing that grows
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Check whether createActionApiRoute defaults allow JWT auth/CORS for actions.
fd -i '^apiBuilder\.server\.ts$' apps/webapp/app/services --exec sed -n '1,260p' {}

Repository: triggerdotdev/trigger.dev

Length of output: 8654


🏁 Script executed:

#!/bin/bash
# Find createActionApiRoute function and check defaults
fd -i '^apiBuilder\.server\.ts$' apps/webapp/app/services --exec grep -n "createActionApiRoute\|allowJWT\|corsStrategy" {} | head -50

Repository: triggerdotdev/trigger.dev

Length of output: 2433


🏁 Script executed:

#!/bin/bash
# Read the full apiBuilder.server.ts to locate createActionApiRoute
fd -i '^apiBuilder\.server\.ts$' apps/webapp/app/services --exec wc -l {}

Repository: triggerdotdev/trigger.dev

Length of output: 133


🏁 Script executed:

#!/bin/bash
# Get the sessions route file and check all route configurations
cat -n apps/webapp/app/routes/api.v1.sessions.ts | head -150

Repository: triggerdotdev/trigger.dev

Length of output: 6980


🏁 Script executed:

#!/bin/bash
# Find other session route files mentioned in the review
fd -type f 'api\.v1\.sessions' apps/webapp/app/routes --exec basename {} \; | sort

Repository: triggerdotdev/trigger.dev

Length of output: 239


🏁 Script executed:

#!/bin/bash
# Check realtime session routes for JWT/CORS configuration
fd -type f 'realtime.*session' apps/webapp/app/routes --exec grep -l 'allowJWT\|corsStrategy' {} \;

Repository: triggerdotdev/trigger.dev

Length of output: 239


🏁 Script executed:

#!/bin/bash
# Find other session route files
fd 'api\.v1\.sessions' apps/webapp/app/routes | sort

Repository: triggerdotdev/trigger.dev

Length of output: 218


🏁 Script executed:

#!/bin/bash
# Find realtime session route files
fd 'realtime.*sessions' apps/webapp/app/routes | sort

Repository: triggerdotdev/trigger.dev

Length of output: 195


🏁 Script executed:

#!/bin/bash
# Check the main sessions routes for retrieve/update/realtime patterns
ls -la apps/webapp/app/routes/ | grep -i session | head -20

Repository: triggerdotdev/trigger.dev

Length of output: 593


🏁 Script executed:

#!/bin/bash
# Check retrieve/update route
cat apps/webapp/app/routes/api.v1.sessions.\$session.ts | head -80

Repository: triggerdotdev/trigger.dev

Length of output: 2415


🏁 Script executed:

#!/bin/bash
# Check close route
cat apps/webapp/app/routes/api.v1.sessions.\$session.close.ts

Repository: triggerdotdev/trigger.dev

Length of output: 1722


🏁 Script executed:

#!/bin/bash
# Check realtime session route
cat apps/webapp/app/routes/realtime.v1.sessions.\$session.\$io.ts | head -80

Repository: triggerdotdev/trigger.dev

Length of output: 2644


🏁 Script executed:

#!/bin/bash
# Check realtime append route
cat apps/webapp/app/routes/realtime.v1.sessions.\$session.\$io.append.ts | head -80

Repository: triggerdotdev/trigger.dev

Length of output: 3003


Enable JWT auth and CORS on the create endpoint.

The list, retrieve, update, close, realtime, and append session routes all explicitly set allowJWT: true and corsStrategy: "all", but the POST /api/v1/sessions create endpoint does not. This causes it to default to allowJWT: false and corsStrategy: "none", making session creation inaccessible to JWT-scoped clients.

Proposed fix
   {
     body: CreateSessionRequestBody,
     method: "POST",
     maxContentLength: 1024 * 32, // 32KB — metadata is the only thing that grows
+    allowJWT: true,
+    corsStrategy: "all",
   },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/routes/api.v1.sessions.ts` around lines 88 - 93, The create
session POST route uses createActionApiRoute but is missing the auth/CORS flags;
update the options object passed to createActionApiRoute for the create action
(the one with body: CreateSessionRequestBody and method: "POST") to include
allowJWT: true and corsStrategy: "all" so that JWT-scoped clients and
cross-origin requests are permitted for the create endpoint.

Comment on lines +21 to +35
const { action } = createActionApiRoute(
{
params: ParamsSchema,
allowJWT: true,
corsStrategy: "all",
authorization: {
action: "write",
resource: (params) => ({ sessions: params.session }),
superScopes: ["write:sessions", "write:all", "admin"],
},
},
async ({ request, params, authentication }) => {
if (request.method !== "PUT") {
return new Response("Method not allowed", { status: 405 });
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Confirm the default method behavior for createActionApiRoute.
fd -i '^apiBuilder\.server\.ts$' apps/webapp/app/services --exec sed -n '1,260p' {}

Repository: triggerdotdev/trigger.dev

Length of output: 8654


🏁 Script executed:

fd -i '^apiBuilder\.server\.ts$' apps/webapp/app/services --exec wc -l {}

Repository: triggerdotdev/trigger.dev

Length of output: 133


🏁 Script executed:

fd -i '^apiBuilder\.server\.ts$' apps/webapp/app/services --exec sed -n '260,600p' {}

Repository: triggerdotdev/trigger.dev

Length of output: 11086


🏁 Script executed:

cat -n apps/webapp/app/routes/realtime.v1.sessions.\$session.\$io.ts | head -50

Repository: triggerdotdev/trigger.dev

Length of output: 2087


Add method: "PUT" to the route options.

The route builder accepts an optional method parameter, which when provided, enforces HTTP method validation at the framework level before the handler executes. Without it, the method check is skipped in the route builder, leaving only the manual check inside the handler. Set method: "PUT" explicitly so validation occurs earlier in the request lifecycle.

Proposed fix
const { action } = createActionApiRoute(
  {
    params: ParamsSchema,
+   method: "PUT",
    allowJWT: true,
    corsStrategy: "all",
    authorization: {
      action: "write",
      resource: (params) => ({ sessions: params.session }),
      superScopes: ["write:sessions", "write:all", "admin"],
    },
  },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/routes/realtime.v1.sessions`.$session.$io.ts around lines 21
- 35, The route options passed into createActionApiRoute should include method:
"PUT" so framework-level HTTP method validation runs before the handler; update
the options object (the first argument to createActionApiRoute that currently
lists params: ParamsSchema, allowJWT, corsStrategy, authorization) to add
method: "PUT" (referencing createActionApiRoute and ParamsSchema to locate the
block) so the PUT-only enforcement happens earlier in the request lifecycle.

Comment on lines +101 to +113
export interface ISessionsRepository {
name: string;
listSessionIds(options: ListSessionsOptions): Promise<string[]>;
listSessions(options: ListSessionsOptions): Promise<{
sessions: ListedSession[];
pagination: {
nextCursor: string | null;
previousCursor: string | null;
};
}>;
countSessions(options: SessionListInputOptions): Promise<number>;
listTags(options: SessionTagListOptions): Promise<SessionTagList>;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Prefer type over interface per coding guidelines.

As per coding guidelines (**/*.{ts,tsx}: "Use types over interfaces for TypeScript"), convert ISessionsRepository to a type alias.

♻️ Proposed fix
-export interface ISessionsRepository {
-  name: string;
-  listSessionIds(options: ListSessionsOptions): Promise<string[]>;
-  listSessions(options: ListSessionsOptions): Promise<{
-    sessions: ListedSession[];
-    pagination: {
-      nextCursor: string | null;
-      previousCursor: string | null;
-    };
-  }>;
-  countSessions(options: SessionListInputOptions): Promise<number>;
-  listTags(options: SessionTagListOptions): Promise<SessionTagList>;
-}
+export type ISessionsRepository = {
+  name: string;
+  listSessionIds(options: ListSessionsOptions): Promise<string[]>;
+  listSessions(options: ListSessionsOptions): Promise<{
+    sessions: ListedSession[];
+    pagination: {
+      nextCursor: string | null;
+      previousCursor: string | null;
+    };
+  }>;
+  countSessions(options: SessionListInputOptions): Promise<number>;
+  listTags(options: SessionTagListOptions): Promise<SessionTagList>;
+};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export interface ISessionsRepository {
name: string;
listSessionIds(options: ListSessionsOptions): Promise<string[]>;
listSessions(options: ListSessionsOptions): Promise<{
sessions: ListedSession[];
pagination: {
nextCursor: string | null;
previousCursor: string | null;
};
}>;
countSessions(options: SessionListInputOptions): Promise<number>;
listTags(options: SessionTagListOptions): Promise<SessionTagList>;
}
export type ISessionsRepository = {
name: string;
listSessionIds(options: ListSessionsOptions): Promise<string[]>;
listSessions(options: ListSessionsOptions): Promise<{
sessions: ListedSession[];
pagination: {
nextCursor: string | null;
previousCursor: string | null;
};
}>;
countSessions(options: SessionListInputOptions): Promise<number>;
listTags(options: SessionTagListOptions): Promise<SessionTagList>;
};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/services/sessionsRepository/sessionsRepository.server.ts`
around lines 101 - 113, Replace the exported interface ISessionsRepository with
an exported type alias that preserves the exact shape (name: string and the same
method signatures for listSessionIds, listSessions, countSessions, and listTags)
so callers and implementations (e.g., any classes or functions referencing
ISessionsRepository) keep the same contract; update the declaration of
ISessionsRepository to use the TypeScript "type" keyword and ensure the
referenced types ListSessionsOptions, ListedSession, SessionListInputOptions,
SessionTagListOptions, and SessionTagList remain unchanged.

Comment on lines +50 to +53
try {
const key = buildKey(sessionFriendlyId, io);
await redis.sadd(key, waitpointId);
await redis.pexpire(key, ttlMs ?? DEFAULT_TTL_MS);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Set the waitpoint and TTL in one Redis transaction.

If SADD succeeds and PEXPIRE fails, this key can persist indefinitely. Use multi() so registration and TTL assignment succeed/fail together.

Proposed fix
     const key = buildKey(sessionFriendlyId, io);
-    await redis.sadd(key, waitpointId);
-    await redis.pexpire(key, ttlMs ?? DEFAULT_TTL_MS);
+    await redis.multi().sadd(key, waitpointId).pexpire(key, ttlMs ?? DEFAULT_TTL_MS).exec();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/services/sessionStreamWaitpointCache.server.ts` around lines
50 - 53, Replace separate SADD and PEXPIRE calls with a Redis transaction so
both operations succeed or fail together: use redis.multi() (or pipeline()) to
queue redis.sadd(buildKey(sessionFriendlyId, io), waitpointId) and
redis.pexpire(buildKey(sessionFriendlyId, io), ttlMs ?? DEFAULT_TTL_MS) and then
await exec()/execAsync(), check the exec result for errors, and handle failures
the same way you would for the try/catch around the current redis calls so the
key cannot be left without a TTL.

Comment on lines +1556 to +1559
export const CloseSessionRequestBody = z.object({
reason: z.string().max(256).optional(),
});
export type CloseSessionRequestBody = z.infer<typeof CloseSessionRequestBody>;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Confirm which field name the close route reads.
fd -t f 'api\.v1\.sessions\.\$session\.close\.ts$' | xargs -r cat
rg -nP '\b(closedReason|body\.reason)\b' -C2

Repository: triggerdotdev/trigger.dev

Length of output: 6643


Rename reason to closedReason for API consistency.

The close request body uses reason, but the route explicitly maps it to the database field closedReason (line 53 of apps/webapp/app/routes/api.v1.sessions.$session.close.ts), and the response schema also uses closedReason. This asymmetry creates an awkward API surface where the request and response field names differ.

Recommended change
 export const CloseSessionRequestBody = z.object({
-  reason: z.string().max(256).optional(),
+  closedReason: z.string().max(256).optional(),
 });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/core/src/v3/schemas/api.ts` around lines 1556 - 1559, Rename the
CloseSessionRequestBody schema field from reason to closedReason to match the
route mapping and response shape: update the z.object in CloseSessionRequestBody
to use closedReason: z.string().max(256).optional(), keep the exported type
CloseSessionRequestBody = z.infer<typeof CloseSessionRequestBody> unchanged, and
run a quick grep for CloseSessionRequestBody usage to adjust any callers
expecting the old reason property.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants