feat: Sessions - bidirectional durable agent streams#3417
feat: Sessions - bidirectional durable agent streams#3417
Conversation
🦋 Changeset detectedLatest commit: 4cadc19 The changes in this PR will be included in the next version bump. This PR includes changesets to release 29 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughIntroduces a durable Session primitive end-to-end: a new Prisma Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Durable, typed, bidirectional I/O primitive that outlives a single run.
Ship target is agent/chat use cases; run-scoped streams.pipe/streams.input
are untouched and do not create Session rows.
Postgres
- New Session table: id, friendlyId, externalId, type (plain string),
denormalised project/environment/organization scalar columns (no FKs),
taskIdentifier, tags String[], metadata Json, closedAt, closedReason,
expiresAt, timestamps
- Point-lookup indexes only (friendlyId unique, (env, externalId) unique,
expiresAt). List queries are served from ClickHouse so Postgres stays
minimal and insert-heavy.
Control-plane API
- POST /api/v1/sessions create (idempotent via externalId)
- GET /api/v1/sessions list with filters (type, tag,
taskIdentifier, externalId, status
ACTIVE|CLOSED|EXPIRED, period/from/to)
and cursor pagination, ClickHouse-backed
- GET /api/v1/sessions/:session retrieve — polymorphic: `session_` prefix
hits friendlyId, otherwise externalId
- PATCH /api/v1/sessions/:session update tags/metadata/externalId
- POST /api/v1/sessions/:session/close terminal close (idempotent)
Realtime (S2-backed)
- PUT /realtime/v1/sessions/:session/:io returns S2 creds
- GET /realtime/v1/sessions/:session/:io SSE subscribe
- POST /realtime/v1/sessions/:session/:io/append server-side append
- S2 key format: sessions/{friendlyId}/{out|in}
Auth
- sessions added to ResourceTypes. read:sessions:{id},
write:sessions:{id}, admin:sessions:{id} scopes work via existing JWT
validation.
ClickHouse
- sessions_v1 ReplacingMergeTree table
- SessionsReplicationService mirrors RunsReplicationService exactly:
logical replication with leader-locked consumer, ConcurrentFlushScheduler,
retry with exponential backoff + jitter, identical metric shape.
Dedicated slot + publication (sessions_to_clickhouse_v1[_publication]).
- SessionsRepository + ClickHouseSessionsRepository expose list, count,
tags with cursor pagination keyed by (created_at DESC, session_id DESC).
- Derived status (ACTIVE/CLOSED/EXPIRED) computed from closed_at + expires_at;
in-memory fallback on list results to catch pre-replication writes.
Verification
- Webapp typecheck 10/10
- Core + SDK build 3/3
- sessionsReplicationService.test.ts integration tests 2/2 (insert + update
round-trip via testcontainers)
- Live round-trip against local dev: create -> retrieve (friendlyId and
externalId) -> out.initialize -> out.append x2 -> in.send -> out.subscribe
(receives records) -> close -> ClickHouse sessions_v1 shows the replicated
row with closed_reason
- Live list smoke: tag, type, status CLOSED, externalId, and cursor pagination
…te/update The session_ prefix identifies internal friendlyIds. Allowing it in a user-supplied externalId would misroute subsequent GET/PATCH/close requests through resolveSessionByIdOrExternalId to a friendlyId lookup, returning null or the wrong session. Reject at the schema boundary so both routes surface a clean 422.
Without allowJWT/corsStrategy, frontend clients holding public access tokens hit 401 on GET /api/v1/sessions and browser preflights fail. Matches the single-session GET/PATCH/close routes and the runs list endpoint.
- Derive isCached from the upsert result (id mismatch = pre-existing row) instead of doing a separate findFirst first. The pre-check was racy — two concurrent first-time POSTs could both return 201 with isCached: false. Using the returned row's id is atomic and saves a round-trip. - Scope the list endpoint's authorization to the standard action/resource pattern (matches api.v1.runs.ts): task-scoped JWTs can list sessions filtered by their task, and broader super-scopes (read:sessions, read:all, admin) authorize unfiltered listing. - Log and swallow unexpected errors on POST rather than returning the raw error.message. Prisma/internal messages can leak column names and query fragments.
Give Session channels run-engine waitpoint semantics so a task can
suspend while idle on a session channel and resume when an external
client sends a record — parallel to what streams.input offers
run-scoped streams.
Webapp
- POST /api/v1/runs/:runFriendlyId/session-streams/wait — creates a
manual waitpoint attached to {sessionId, io} and race-checks the S2
stream starting at lastSeqNum so pre-arrived data fires it
immediately. Mirrors the existing input-stream waitpoint route.
- sessionStreamWaitpointCache.server.ts — Redis set keyed on
{sessionFriendlyId, io}, drained atomically on each append so
concurrent multi-tab waiters all wake together.
- realtime.v1.sessions.$session.$io.append now drains pending
waitpoints after every record lands and completes each with the
appended body.
- S2RealtimeStreams.readSessionStreamRecords — session-channel
parallel of readRecords, feeds the race-check path.
Core
- CreateSessionStreamWaitpoint request/response schemas alongside
the existing Session CRUD schemas. Server API contract only —
the client ApiClient + SDK wrapper ship on the AI-chat branch.
2210fe2 to
4cadc19
Compare
| await redis.sadd(key, waitpointId); | ||
| await redis.pexpire(key, ttlMs ?? DEFAULT_TTL_MS); |
There was a problem hiding this comment.
🟡 pexpire on Redis set resets TTL for all waitpoints, causing premature eviction of longer-lived registrations
When multiple waitpoints with different timeouts are registered on the same (sessionFriendlyId, io) Redis set, each call to addSessionStreamWaitpoint invokes pexpire on the shared key, resetting the TTL for the entire set to the latest caller's value. If waitpoint A is registered with a 1-hour timeout, and waitpoint B is later registered on the same channel with a 5-minute timeout, the pexpire at line 53 overwrites the key TTL to 5 minutes. After 5 minutes the key expires, removing waitpoint A's registration from Redis. When data subsequently arrives on the channel (between 5 min and 1 hour), the append handler's drainSessionStreamWaitpoints call returns an empty set and waitpoint A is never completed — it will only time out via the engine's internal timeout mechanism instead of being fulfilled with the actual data.
The existing inputStreamWaitpointCache.server.ts avoids this because it stores a single string value per key (one waitpoint per (run, stream) pair), whereas sessions intentionally support multiple concurrent waiters via a Redis set.
Prompt for agents
In addSessionStreamWaitpoint (sessionStreamWaitpointCache.server.ts), the pexpire call at line 53 resets the TTL for the entire Redis set to the latest waitpoint's value. When multiple waitpoints with different timeouts coexist on the same (sessionFriendlyId, io) key, a shorter-TTL waitpoint added later will prematurely expire the key, evicting longer-lived waitpoint registrations.
Possible approaches:
1. Use the maximum of the current key TTL and the new TTL: read the current PTTL of the key, and only call pexpire if the new TTL is greater. This can be done atomically with a Lua script.
2. Store each waitpoint as a separate Redis key with its own TTL (e.g. ssw:{sessionId}:{io}:{waitpointId}), and use a SCAN/pattern match in drainSessionStreamWaitpoints. This trades atomicity of the drain for per-waitpoint TTL correctness.
3. Always use DEFAULT_TTL_MS for the key expiry (since it is already 7 days) and let the engine's internal timeout mechanism handle waitpoint-level expiry. Only use the custom ttlMs if it is larger than the current TTL.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Actionable comments posted: 8
♻️ Duplicate comments (2)
apps/webapp/app/routes/api.v1.sessions.$session.ts (1)
30-33:⚠️ Potential issue | 🟡 MinorFilter out missing
externalIdbefore authorization.Line 32 passes
""as a session identifier whenexternalIdis null. Build the resource list conditionally so an empty string cannot match a malformed or permissive scope.Proposed fix
- resource: (session) => ({ sessions: [session.friendlyId, session.externalId ?? ""] }), + resource: (session) => ({ + sessions: session.externalId + ? [session.friendlyId, session.externalId] + : [session.friendlyId], + }),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/webapp/app/routes/api.v1.sessions`.$session.ts around lines 30 - 33, The authorization resource builder currently includes session.externalId ?? "" which can add an empty string as an identifier; update the resource function used in the authorization object (resource: (session) => ...) to only include session.externalId when it is non-null/defined (e.g., build an array starting with session.friendlyId and push session.externalId only if truthy) so the resulting sessions list contains no empty-string entries and cannot match malformed/permissive scopes.apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts (1)
77-85:⚠️ Potential issue | 🟠 MajorReturn a generic 500 for append failures.
Line 84 surfaces
appendError.messagefor non-validation failures. Log the internal error server-side and return a stable generic message to avoid leaking S2/client/internal details.Proposed fix
if (appendError instanceof ServiceValidationError) { return json( { ok: false, error: appendError.message }, { status: appendError.status ?? 422 } ); } - return json({ ok: false, error: appendError.message }, { status: 500 }); + logger.error("Failed to append to session stream", { + sessionFriendlyId: session.friendlyId, + io: params.io, + error: appendError, + }); + return json({ ok: false, error: "Something went wrong" }, { status: 500 }); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/webapp/app/routes/realtime.v1.sessions`.$session.$io.append.ts around lines 77 - 85, The current error handling block returns appendError.message for non-validation errors; change it so that when appendError is not an instance of ServiceValidationError you log the full error server-side (e.g., using the existing logger or console.error) and return a stable generic JSON error message via json({ ok: false, error: "Internal server error" }, { status: 500 }) instead of exposing appendError.message; keep the ServiceValidationError branch unchanged and reference the same symbols appendError, ServiceValidationError, and json to locate and update the logic.
🧹 Nitpick comments (3)
apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts (1)
76-84: Recommended: parallelizecreateWaitpointTagcalls.The sequential
awaitin the loop serializes up toMAX_TAGS_PER_WAITPOINTround-trips on the hot path. APromise.alloverbodyTags.map(...)is a trivial change with meaningful latency savings and no correctness impact (eachcreateWaitpointTagis scoped by env+project+tag).♻️ Suggested fix
- if (bodyTags && bodyTags.length > 0) { - for (const tag of bodyTags) { - await createWaitpointTag({ - tag, - environmentId: authentication.environment.id, - projectId: authentication.environment.projectId, - }); - } - } + if (bodyTags && bodyTags.length > 0) { + await Promise.all( + bodyTags.map((tag) => + createWaitpointTag({ + tag, + environmentId: authentication.environment.id, + projectId: authentication.environment.projectId, + }) + ) + ); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/webapp/app/routes/api.v1.runs`.$runFriendlyId.session-streams.wait.ts around lines 76 - 84, The loop is awaiting createWaitpointTag sequentially which serializes network calls; replace the for-await pattern with a parallelized call like Promise.all(bodyTags.map(tag => createWaitpointTag({ tag, environmentId: authentication.environment.id, projectId: authentication.environment.projectId }))) so all createWaitpointTag calls run concurrently (guard as before with if (bodyTags && bodyTags.length > 0)); keep the same payload shape and await the Promise.all result to preserve error propagation.apps/webapp/test/sessionsReplicationService.test.ts (1)
1-205: Move this test beside the service under test.This new suite targets
~/services/sessionsReplicationService.server, but it lives inapps/webapp/test/. Please place it next to the service file using the repository’s adjacent test naming convention.As per coding guidelines, “Test files should live beside the files under test and use descriptive
describeanditblocks” and “Place test files next to source files with naming patternMyService.ts->MyService.test.ts.”🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/webapp/test/sessionsReplicationService.test.ts` around lines 1 - 205, The test suite for SessionsReplicationService is in the wrong place; move this test file so it sits adjacent to the SessionsReplicationService implementation (the module that exports SessionsReplicationService / sessionsReplicationService.server) and rename it to follow the adjacent naming convention (e.g., SessionsReplicationService.test.ts). Update all imports that use the app-root alias (~/services/sessionsReplicationService.server) to relative imports pointing to the service file, and ensure any test helpers like containerTest, ClickHouse, and prisma imports resolve correctly from the new location; keep the describe/it semantics but ensure the file name matches the service-to-test mapping.packages/core/src/v3/schemas/api.ts (1)
1502-1502: Nit: reject emptytaskIdentifier.
z.string().max(128).optional()accepts"", which will then be stored as-is and confuse downstream lookups/filters that treat absence vs. empty differently. Align withexternalIdby trimming and requiring at least one character.- taskIdentifier: z.string().max(128).optional(), + taskIdentifier: z.string().trim().min(1).max(128).optional(),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/core/src/v3/schemas/api.ts` at line 1502, The taskIdentifier schema currently allows empty strings; update the validator for taskIdentifier (in the API schema where taskIdentifier: z.string().max(128).optional()) to reject empty values by trimming and requiring at least one character—e.g., use z.string().trim().min(1).max(128).optional()—so it aligns with externalId and avoids storing "" versus absent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/webapp/app/entry.server.tsx`:
- Around line 45-52: The SIGTERM/SIGINT handlers currently bind the async method
sessionsReplicationInstance.shutdown directly, which can produce unhandled
promise rejections; change the handlers registered via
signalsEmitter.on("SIGTERM", ...) and signalsEmitter.on("SIGINT", ...) to call
an async wrapper that invokes sessionsReplicationInstance.shutdown() and
attaches a .catch(...) to log or swallow errors (mirror the pattern used in
dynamicFlushScheduler.server for safe async signal handling), ensuring any
rejection from _replicationClient.stop() is handled.
In `@apps/webapp/app/routes/api.v1.runs`.$runFriendlyId.session-streams.wait.ts:
- Around line 143-161: The empty catch after the race-check should not swallow
errors — replace it with a logger.warn/error that includes the context
(sessionFriendlyId, io, and WaitpointId.toFriendlyId(result.waitpoint.id) or
waitpointId variable) and the caught error so ops can distinguish cached-drain
vs broken race-checks; and in the outer catch branch that currently does `return
json({ error: error.message }, { status: 500 })` do not return raw internal
messages — log the error (with sessionFriendlyId/io/waitpointId/context) and
return a generic 500 response like `{ error: "Something went wrong" }` while
preserving the existing 422 handling for ServiceValidationError.
In `@apps/webapp/app/routes/api.v1.sessions`.$session.close.ts:
- Around line 45-55: The close endpoint currently does a read-then-update which
has a TOCTOU race: when updating the session in the block that calls
prisma.session.update (using existing.id and setting closedAt/closedReason),
change the update to be conditional by adding closedAt: null to the where clause
so the DB only writes if not already closed; after the conditional update, if no
row was affected (update returned null or zero), re-query the session (the
existing/serializeSession path) and return that value to ensure idempotent,
write-once behavior for closedAt/closedReason.
In `@apps/webapp/app/routes/api.v1.sessions.ts`:
- Around line 88-93: The create session POST route uses createActionApiRoute but
is missing the auth/CORS flags; update the options object passed to
createActionApiRoute for the create action (the one with body:
CreateSessionRequestBody and method: "POST") to include allowJWT: true and
corsStrategy: "all" so that JWT-scoped clients and cross-origin requests are
permitted for the create endpoint.
In `@apps/webapp/app/routes/realtime.v1.sessions`.$session.$io.ts:
- Around line 21-35: The route options passed into createActionApiRoute should
include method: "PUT" so framework-level HTTP method validation runs before the
handler; update the options object (the first argument to createActionApiRoute
that currently lists params: ParamsSchema, allowJWT, corsStrategy,
authorization) to add method: "PUT" (referencing createActionApiRoute and
ParamsSchema to locate the block) so the PUT-only enforcement happens earlier in
the request lifecycle.
In `@apps/webapp/app/services/sessionsRepository/sessionsRepository.server.ts`:
- Around line 101-113: Replace the exported interface ISessionsRepository with
an exported type alias that preserves the exact shape (name: string and the same
method signatures for listSessionIds, listSessions, countSessions, and listTags)
so callers and implementations (e.g., any classes or functions referencing
ISessionsRepository) keep the same contract; update the declaration of
ISessionsRepository to use the TypeScript "type" keyword and ensure the
referenced types ListSessionsOptions, ListedSession, SessionListInputOptions,
SessionTagListOptions, and SessionTagList remain unchanged.
In `@apps/webapp/app/services/sessionStreamWaitpointCache.server.ts`:
- Around line 50-53: Replace separate SADD and PEXPIRE calls with a Redis
transaction so both operations succeed or fail together: use redis.multi() (or
pipeline()) to queue redis.sadd(buildKey(sessionFriendlyId, io), waitpointId)
and redis.pexpire(buildKey(sessionFriendlyId, io), ttlMs ?? DEFAULT_TTL_MS) and
then await exec()/execAsync(), check the exec result for errors, and handle
failures the same way you would for the try/catch around the current redis calls
so the key cannot be left without a TTL.
In `@packages/core/src/v3/schemas/api.ts`:
- Around line 1556-1559: Rename the CloseSessionRequestBody schema field from
reason to closedReason to match the route mapping and response shape: update the
z.object in CloseSessionRequestBody to use closedReason:
z.string().max(256).optional(), keep the exported type CloseSessionRequestBody =
z.infer<typeof CloseSessionRequestBody> unchanged, and run a quick grep for
CloseSessionRequestBody usage to adjust any callers expecting the old reason
property.
---
Duplicate comments:
In `@apps/webapp/app/routes/api.v1.sessions`.$session.ts:
- Around line 30-33: The authorization resource builder currently includes
session.externalId ?? "" which can add an empty string as an identifier; update
the resource function used in the authorization object (resource: (session) =>
...) to only include session.externalId when it is non-null/defined (e.g., build
an array starting with session.friendlyId and push session.externalId only if
truthy) so the resulting sessions list contains no empty-string entries and
cannot match malformed/permissive scopes.
In `@apps/webapp/app/routes/realtime.v1.sessions`.$session.$io.append.ts:
- Around line 77-85: The current error handling block returns
appendError.message for non-validation errors; change it so that when
appendError is not an instance of ServiceValidationError you log the full error
server-side (e.g., using the existing logger or console.error) and return a
stable generic JSON error message via json({ ok: false, error: "Internal server
error" }, { status: 500 }) instead of exposing appendError.message; keep the
ServiceValidationError branch unchanged and reference the same symbols
appendError, ServiceValidationError, and json to locate and update the logic.
---
Nitpick comments:
In `@apps/webapp/app/routes/api.v1.runs`.$runFriendlyId.session-streams.wait.ts:
- Around line 76-84: The loop is awaiting createWaitpointTag sequentially which
serializes network calls; replace the for-await pattern with a parallelized call
like Promise.all(bodyTags.map(tag => createWaitpointTag({ tag, environmentId:
authentication.environment.id, projectId: authentication.environment.projectId
}))) so all createWaitpointTag calls run concurrently (guard as before with if
(bodyTags && bodyTags.length > 0)); keep the same payload shape and await the
Promise.all result to preserve error propagation.
In `@apps/webapp/test/sessionsReplicationService.test.ts`:
- Around line 1-205: The test suite for SessionsReplicationService is in the
wrong place; move this test file so it sits adjacent to the
SessionsReplicationService implementation (the module that exports
SessionsReplicationService / sessionsReplicationService.server) and rename it to
follow the adjacent naming convention (e.g.,
SessionsReplicationService.test.ts). Update all imports that use the app-root
alias (~/services/sessionsReplicationService.server) to relative imports
pointing to the service file, and ensure any test helpers like containerTest,
ClickHouse, and prisma imports resolve correctly from the new location; keep the
describe/it semantics but ensure the file name matches the service-to-test
mapping.
In `@packages/core/src/v3/schemas/api.ts`:
- Line 1502: The taskIdentifier schema currently allows empty strings; update
the validator for taskIdentifier (in the API schema where taskIdentifier:
z.string().max(128).optional()) to reject empty values by trimming and requiring
at least one character—e.g., use z.string().trim().min(1).max(128).optional()—so
it aligns with externalId and avoids storing "" versus absent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: bfe7b4b3-0bee-49c6-893c-6b4cc70e98af
📒 Files selected for processing (28)
.changeset/session-primitive.md.server-changes/session-primitive.mdapps/webapp/app/entry.server.tsxapps/webapp/app/env.server.tsapps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.tsapps/webapp/app/routes/api.v1.sessions.$session.close.tsapps/webapp/app/routes/api.v1.sessions.$session.tsapps/webapp/app/routes/api.v1.sessions.tsapps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.tsapps/webapp/app/routes/realtime.v1.sessions.$session.$io.tsapps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.tsapps/webapp/app/services/authorization.server.tsapps/webapp/app/services/realtime/s2realtimeStreams.server.tsapps/webapp/app/services/realtime/sessions.server.tsapps/webapp/app/services/sessionStreamWaitpointCache.server.tsapps/webapp/app/services/sessionsReplicationInstance.server.tsapps/webapp/app/services/sessionsReplicationService.server.tsapps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.tsapps/webapp/app/services/sessionsRepository/sessionsRepository.server.tsapps/webapp/app/v3/services/adminWorker.server.tsapps/webapp/test/sessionsReplicationService.test.tsinternal-packages/clickhouse/schema/030_create_sessions_v1.sqlinternal-packages/clickhouse/src/index.tsinternal-packages/clickhouse/src/sessions.tsinternal-packages/database/prisma/migrations/20260419000000_add_sessions_table/migration.sqlinternal-packages/database/prisma/schema.prismapackages/core/src/v3/isomorphic/friendlyId.tspackages/core/src/v3/schemas/api.ts
✅ Files skipped from review due to trivial changes (6)
- packages/core/src/v3/isomorphic/friendlyId.ts
- apps/webapp/app/services/authorization.server.ts
- apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts
- apps/webapp/app/services/sessionsReplicationInstance.server.ts
- internal-packages/clickhouse/schema/030_create_sessions_v1.sql
- apps/webapp/app/services/sessionsReplicationService.server.ts
🚧 Files skipped from review as they are similar to previous changes (6)
- apps/webapp/app/v3/services/adminWorker.server.ts
- .changeset/session-primitive.md
- internal-packages/database/prisma/migrations/20260419000000_add_sessions_table/migration.sql
- internal-packages/clickhouse/src/index.ts
- apps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.ts
- apps/webapp/app/services/realtime/s2realtimeStreams.server.ts
| signalsEmitter.on( | ||
| "SIGTERM", | ||
| sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance) | ||
| ); | ||
| signalsEmitter.on( | ||
| "SIGINT", | ||
| sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance) | ||
| ); |
There was a problem hiding this comment.
Wrap async shutdown signal handlers with .catch to avoid unhandled rejections.
sessionsReplicationInstance.shutdown is async and can reject (e.g., _replicationClient.stop() errors). Without a .catch, a rejection during SIGTERM/SIGINT becomes an unhandled promise rejection while the process is already tearing down. This is the pattern used elsewhere in the codebase (see apps/webapp/app/v3/dynamicFlushScheduler.server.ts line 145-160).
♻️ Proposed fix
- signalsEmitter.on(
- "SIGTERM",
- sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
- );
- signalsEmitter.on(
- "SIGINT",
- sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
- );
+ const shutdownSessionsReplication = () =>
+ sessionsReplicationInstance.shutdown().catch((error) => {
+ console.error("🗃️ Sessions replication service shutdown error", { error });
+ });
+ signalsEmitter.on("SIGTERM", shutdownSessionsReplication);
+ signalsEmitter.on("SIGINT", shutdownSessionsReplication);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/entry.server.tsx` around lines 45 - 52, The SIGTERM/SIGINT
handlers currently bind the async method sessionsReplicationInstance.shutdown
directly, which can produce unhandled promise rejections; change the handlers
registered via signalsEmitter.on("SIGTERM", ...) and signalsEmitter.on("SIGINT",
...) to call an async wrapper that invokes
sessionsReplicationInstance.shutdown() and attaches a .catch(...) to log or
swallow errors (mirror the pattern used in dynamicFlushScheduler.server for safe
async signal handling), ensuring any rejection from _replicationClient.stop() is
handled.
| } catch { | ||
| // Non-fatal: pending registration stays in Redis; the next append | ||
| // will complete the waitpoint via the append handler path. | ||
| } | ||
| } | ||
|
|
||
| return json<CreateSessionStreamWaitpointResponseBody>({ | ||
| waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id), | ||
| isCached: result.isCached, | ||
| }); | ||
| } catch (error) { | ||
| if (error instanceof ServiceValidationError) { | ||
| return json({ error: error.message }, { status: 422 }); | ||
| } else if (error instanceof Error) { | ||
| return json({ error: error.message }, { status: 500 }); | ||
| } | ||
|
|
||
| return json({ error: "Something went wrong" }, { status: 500 }); | ||
| } |
There was a problem hiding this comment.
Consider logging the swallowed race-check error and not leaking raw error messages.
Two related concerns in the error-handling tail:
- Line 143-146: the
catch {}silently drops any failure from the race-check (S2 read / engine complete / cache remove). If this path has latent bugs you'll only notice via timeouts, since nothing is recorded. Please at leastlogger.warn/logger.errorwithsessionFriendlyId,io, andwaitpointIdso ops can tell cached-drain-on-append from broken race-checks. - Line 156-158:
return json({ error: error.message }, { status: 500 })forwards arbitrary internal error messages (e.g., Prisma, engine, S2) to the client. Per the PR commit message "log and swallow unexpected internal errors instead of returning raw Prisma/internal messages that could leak details", this route should follow the same pattern — log the error server-side and return a generic 500 message.
♻️ Suggested fix
- } catch {
- // Non-fatal: pending registration stays in Redis; the next append
- // will complete the waitpoint via the append handler path.
- }
+ } catch (error) {
+ // Non-fatal: pending registration stays in Redis; the next append
+ // will complete the waitpoint via the append handler path.
+ logger.warn("session-stream wait race-check failed", {
+ sessionFriendlyId: session.friendlyId,
+ io: body.io,
+ waitpointId: result.waitpoint.id,
+ error,
+ });
+ }
...
- } catch (error) {
- if (error instanceof ServiceValidationError) {
- return json({ error: error.message }, { status: 422 });
- } else if (error instanceof Error) {
- return json({ error: error.message }, { status: 500 });
- }
-
- return json({ error: "Something went wrong" }, { status: 500 });
- }
+ } catch (error) {
+ if (error instanceof ServiceValidationError) {
+ return json({ error: error.message }, { status: 422 });
+ }
+ logger.error("Failed to create session-stream waitpoint", { error });
+ return json({ error: "Something went wrong" }, { status: 500 });
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/routes/api.v1.runs`.$runFriendlyId.session-streams.wait.ts
around lines 143 - 161, The empty catch after the race-check should not swallow
errors — replace it with a logger.warn/error that includes the context
(sessionFriendlyId, io, and WaitpointId.toFriendlyId(result.waitpoint.id) or
waitpointId variable) and the caught error so ops can distinguish cached-drain
vs broken race-checks; and in the outer catch branch that currently does `return
json({ error: error.message }, { status: 500 })` do not return raw internal
messages — log the error (with sessionFriendlyId/io/waitpointId/context) and
return a generic 500 response like `{ error: "Something went wrong" }` while
preserving the existing 422 handling for ServiceValidationError.
| if (existing.closedAt) { | ||
| return json<RetrieveSessionResponseBody>(serializeSession(existing)); | ||
| } | ||
|
|
||
| const updated = await prisma.session.update({ | ||
| where: { id: existing.id }, | ||
| data: { | ||
| closedAt: new Date(), | ||
| closedReason: body.reason ?? null, | ||
| }, | ||
| }); |
There was a problem hiding this comment.
Minor: idempotent close has a TOCTOU — concurrent closes can overwrite closedAt/closedReason.
Two concurrent POST /close calls can both observe existing.closedAt == null and both run the update, so the second overwrites the first's closedAt timestamp and closedReason. Given the PR description states terminal markers are "write-once, never flipped back", you can enforce that at the DB level by scoping the update with closedAt: null and falling back to re-reading when the update affects zero rows.
♻️ Suggested fix
- const updated = await prisma.session.update({
- where: { id: existing.id },
- data: {
- closedAt: new Date(),
- closedReason: body.reason ?? null,
- },
- });
-
- return json<RetrieveSessionResponseBody>(serializeSession(updated));
+ const { count } = await prisma.session.updateMany({
+ where: { id: existing.id, closedAt: null },
+ data: {
+ closedAt: new Date(),
+ closedReason: body.reason ?? null,
+ },
+ });
+
+ const final =
+ count === 0
+ ? await prisma.session.findFirst({ where: { id: existing.id } })
+ : await prisma.session.findFirst({ where: { id: existing.id } });
+
+ if (!final) {
+ return json({ error: "Session not found" }, { status: 404 });
+ }
+
+ return json<RetrieveSessionResponseBody>(serializeSession(final));📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (existing.closedAt) { | |
| return json<RetrieveSessionResponseBody>(serializeSession(existing)); | |
| } | |
| const updated = await prisma.session.update({ | |
| where: { id: existing.id }, | |
| data: { | |
| closedAt: new Date(), | |
| closedReason: body.reason ?? null, | |
| }, | |
| }); | |
| if (existing.closedAt) { | |
| return json<RetrieveSessionResponseBody>(serializeSession(existing)); | |
| } | |
| const { count } = await prisma.session.updateMany({ | |
| where: { id: existing.id, closedAt: null }, | |
| data: { | |
| closedAt: new Date(), | |
| closedReason: body.reason ?? null, | |
| }, | |
| }); | |
| const final = await prisma.session.findFirst({ where: { id: existing.id } }); | |
| if (!final) { | |
| return json({ error: "Session not found" }, { status: 404 }); | |
| } | |
| return json<RetrieveSessionResponseBody>(serializeSession(final)); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/routes/api.v1.sessions`.$session.close.ts around lines 45 -
55, The close endpoint currently does a read-then-update which has a TOCTOU
race: when updating the session in the block that calls prisma.session.update
(using existing.id and setting closedAt/closedReason), change the update to be
conditional by adding closedAt: null to the where clause so the DB only writes
if not already closed; after the conditional update, if no row was affected
(update returned null or zero), re-query the session (the
existing/serializeSession path) and return that value to ensure idempotent,
write-once behavior for closedAt/closedReason.
| const { action } = createActionApiRoute( | ||
| { | ||
| body: CreateSessionRequestBody, | ||
| method: "POST", | ||
| maxContentLength: 1024 * 32, // 32KB — metadata is the only thing that grows | ||
| }, |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Check whether createActionApiRoute defaults allow JWT auth/CORS for actions.
fd -i '^apiBuilder\.server\.ts$' apps/webapp/app/services --exec sed -n '1,260p' {}Repository: triggerdotdev/trigger.dev
Length of output: 8654
🏁 Script executed:
#!/bin/bash
# Find createActionApiRoute function and check defaults
fd -i '^apiBuilder\.server\.ts$' apps/webapp/app/services --exec grep -n "createActionApiRoute\|allowJWT\|corsStrategy" {} | head -50Repository: triggerdotdev/trigger.dev
Length of output: 2433
🏁 Script executed:
#!/bin/bash
# Read the full apiBuilder.server.ts to locate createActionApiRoute
fd -i '^apiBuilder\.server\.ts$' apps/webapp/app/services --exec wc -l {}Repository: triggerdotdev/trigger.dev
Length of output: 133
🏁 Script executed:
#!/bin/bash
# Get the sessions route file and check all route configurations
cat -n apps/webapp/app/routes/api.v1.sessions.ts | head -150Repository: triggerdotdev/trigger.dev
Length of output: 6980
🏁 Script executed:
#!/bin/bash
# Find other session route files mentioned in the review
fd -type f 'api\.v1\.sessions' apps/webapp/app/routes --exec basename {} \; | sortRepository: triggerdotdev/trigger.dev
Length of output: 239
🏁 Script executed:
#!/bin/bash
# Check realtime session routes for JWT/CORS configuration
fd -type f 'realtime.*session' apps/webapp/app/routes --exec grep -l 'allowJWT\|corsStrategy' {} \;Repository: triggerdotdev/trigger.dev
Length of output: 239
🏁 Script executed:
#!/bin/bash
# Find other session route files
fd 'api\.v1\.sessions' apps/webapp/app/routes | sortRepository: triggerdotdev/trigger.dev
Length of output: 218
🏁 Script executed:
#!/bin/bash
# Find realtime session route files
fd 'realtime.*sessions' apps/webapp/app/routes | sortRepository: triggerdotdev/trigger.dev
Length of output: 195
🏁 Script executed:
#!/bin/bash
# Check the main sessions routes for retrieve/update/realtime patterns
ls -la apps/webapp/app/routes/ | grep -i session | head -20Repository: triggerdotdev/trigger.dev
Length of output: 593
🏁 Script executed:
#!/bin/bash
# Check retrieve/update route
cat apps/webapp/app/routes/api.v1.sessions.\$session.ts | head -80Repository: triggerdotdev/trigger.dev
Length of output: 2415
🏁 Script executed:
#!/bin/bash
# Check close route
cat apps/webapp/app/routes/api.v1.sessions.\$session.close.tsRepository: triggerdotdev/trigger.dev
Length of output: 1722
🏁 Script executed:
#!/bin/bash
# Check realtime session route
cat apps/webapp/app/routes/realtime.v1.sessions.\$session.\$io.ts | head -80Repository: triggerdotdev/trigger.dev
Length of output: 2644
🏁 Script executed:
#!/bin/bash
# Check realtime append route
cat apps/webapp/app/routes/realtime.v1.sessions.\$session.\$io.append.ts | head -80Repository: triggerdotdev/trigger.dev
Length of output: 3003
Enable JWT auth and CORS on the create endpoint.
The list, retrieve, update, close, realtime, and append session routes all explicitly set allowJWT: true and corsStrategy: "all", but the POST /api/v1/sessions create endpoint does not. This causes it to default to allowJWT: false and corsStrategy: "none", making session creation inaccessible to JWT-scoped clients.
Proposed fix
{
body: CreateSessionRequestBody,
method: "POST",
maxContentLength: 1024 * 32, // 32KB — metadata is the only thing that grows
+ allowJWT: true,
+ corsStrategy: "all",
},🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/routes/api.v1.sessions.ts` around lines 88 - 93, The create
session POST route uses createActionApiRoute but is missing the auth/CORS flags;
update the options object passed to createActionApiRoute for the create action
(the one with body: CreateSessionRequestBody and method: "POST") to include
allowJWT: true and corsStrategy: "all" so that JWT-scoped clients and
cross-origin requests are permitted for the create endpoint.
| const { action } = createActionApiRoute( | ||
| { | ||
| params: ParamsSchema, | ||
| allowJWT: true, | ||
| corsStrategy: "all", | ||
| authorization: { | ||
| action: "write", | ||
| resource: (params) => ({ sessions: params.session }), | ||
| superScopes: ["write:sessions", "write:all", "admin"], | ||
| }, | ||
| }, | ||
| async ({ request, params, authentication }) => { | ||
| if (request.method !== "PUT") { | ||
| return new Response("Method not allowed", { status: 405 }); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Confirm the default method behavior for createActionApiRoute.
fd -i '^apiBuilder\.server\.ts$' apps/webapp/app/services --exec sed -n '1,260p' {}Repository: triggerdotdev/trigger.dev
Length of output: 8654
🏁 Script executed:
fd -i '^apiBuilder\.server\.ts$' apps/webapp/app/services --exec wc -l {}Repository: triggerdotdev/trigger.dev
Length of output: 133
🏁 Script executed:
fd -i '^apiBuilder\.server\.ts$' apps/webapp/app/services --exec sed -n '260,600p' {}Repository: triggerdotdev/trigger.dev
Length of output: 11086
🏁 Script executed:
cat -n apps/webapp/app/routes/realtime.v1.sessions.\$session.\$io.ts | head -50Repository: triggerdotdev/trigger.dev
Length of output: 2087
Add method: "PUT" to the route options.
The route builder accepts an optional method parameter, which when provided, enforces HTTP method validation at the framework level before the handler executes. Without it, the method check is skipped in the route builder, leaving only the manual check inside the handler. Set method: "PUT" explicitly so validation occurs earlier in the request lifecycle.
Proposed fix
const { action } = createActionApiRoute(
{
params: ParamsSchema,
+ method: "PUT",
allowJWT: true,
corsStrategy: "all",
authorization: {
action: "write",
resource: (params) => ({ sessions: params.session }),
superScopes: ["write:sessions", "write:all", "admin"],
},
},🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/routes/realtime.v1.sessions`.$session.$io.ts around lines 21
- 35, The route options passed into createActionApiRoute should include method:
"PUT" so framework-level HTTP method validation runs before the handler; update
the options object (the first argument to createActionApiRoute that currently
lists params: ParamsSchema, allowJWT, corsStrategy, authorization) to add
method: "PUT" (referencing createActionApiRoute and ParamsSchema to locate the
block) so the PUT-only enforcement happens earlier in the request lifecycle.
| export interface ISessionsRepository { | ||
| name: string; | ||
| listSessionIds(options: ListSessionsOptions): Promise<string[]>; | ||
| listSessions(options: ListSessionsOptions): Promise<{ | ||
| sessions: ListedSession[]; | ||
| pagination: { | ||
| nextCursor: string | null; | ||
| previousCursor: string | null; | ||
| }; | ||
| }>; | ||
| countSessions(options: SessionListInputOptions): Promise<number>; | ||
| listTags(options: SessionTagListOptions): Promise<SessionTagList>; | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Prefer type over interface per coding guidelines.
As per coding guidelines (**/*.{ts,tsx}: "Use types over interfaces for TypeScript"), convert ISessionsRepository to a type alias.
♻️ Proposed fix
-export interface ISessionsRepository {
- name: string;
- listSessionIds(options: ListSessionsOptions): Promise<string[]>;
- listSessions(options: ListSessionsOptions): Promise<{
- sessions: ListedSession[];
- pagination: {
- nextCursor: string | null;
- previousCursor: string | null;
- };
- }>;
- countSessions(options: SessionListInputOptions): Promise<number>;
- listTags(options: SessionTagListOptions): Promise<SessionTagList>;
-}
+export type ISessionsRepository = {
+ name: string;
+ listSessionIds(options: ListSessionsOptions): Promise<string[]>;
+ listSessions(options: ListSessionsOptions): Promise<{
+ sessions: ListedSession[];
+ pagination: {
+ nextCursor: string | null;
+ previousCursor: string | null;
+ };
+ }>;
+ countSessions(options: SessionListInputOptions): Promise<number>;
+ listTags(options: SessionTagListOptions): Promise<SessionTagList>;
+};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export interface ISessionsRepository { | |
| name: string; | |
| listSessionIds(options: ListSessionsOptions): Promise<string[]>; | |
| listSessions(options: ListSessionsOptions): Promise<{ | |
| sessions: ListedSession[]; | |
| pagination: { | |
| nextCursor: string | null; | |
| previousCursor: string | null; | |
| }; | |
| }>; | |
| countSessions(options: SessionListInputOptions): Promise<number>; | |
| listTags(options: SessionTagListOptions): Promise<SessionTagList>; | |
| } | |
| export type ISessionsRepository = { | |
| name: string; | |
| listSessionIds(options: ListSessionsOptions): Promise<string[]>; | |
| listSessions(options: ListSessionsOptions): Promise<{ | |
| sessions: ListedSession[]; | |
| pagination: { | |
| nextCursor: string | null; | |
| previousCursor: string | null; | |
| }; | |
| }>; | |
| countSessions(options: SessionListInputOptions): Promise<number>; | |
| listTags(options: SessionTagListOptions): Promise<SessionTagList>; | |
| }; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/services/sessionsRepository/sessionsRepository.server.ts`
around lines 101 - 113, Replace the exported interface ISessionsRepository with
an exported type alias that preserves the exact shape (name: string and the same
method signatures for listSessionIds, listSessions, countSessions, and listTags)
so callers and implementations (e.g., any classes or functions referencing
ISessionsRepository) keep the same contract; update the declaration of
ISessionsRepository to use the TypeScript "type" keyword and ensure the
referenced types ListSessionsOptions, ListedSession, SessionListInputOptions,
SessionTagListOptions, and SessionTagList remain unchanged.
| try { | ||
| const key = buildKey(sessionFriendlyId, io); | ||
| await redis.sadd(key, waitpointId); | ||
| await redis.pexpire(key, ttlMs ?? DEFAULT_TTL_MS); |
There was a problem hiding this comment.
Set the waitpoint and TTL in one Redis transaction.
If SADD succeeds and PEXPIRE fails, this key can persist indefinitely. Use multi() so registration and TTL assignment succeed/fail together.
Proposed fix
const key = buildKey(sessionFriendlyId, io);
- await redis.sadd(key, waitpointId);
- await redis.pexpire(key, ttlMs ?? DEFAULT_TTL_MS);
+ await redis.multi().sadd(key, waitpointId).pexpire(key, ttlMs ?? DEFAULT_TTL_MS).exec();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/webapp/app/services/sessionStreamWaitpointCache.server.ts` around lines
50 - 53, Replace separate SADD and PEXPIRE calls with a Redis transaction so
both operations succeed or fail together: use redis.multi() (or pipeline()) to
queue redis.sadd(buildKey(sessionFriendlyId, io), waitpointId) and
redis.pexpire(buildKey(sessionFriendlyId, io), ttlMs ?? DEFAULT_TTL_MS) and then
await exec()/execAsync(), check the exec result for errors, and handle failures
the same way you would for the try/catch around the current redis calls so the
key cannot be left without a TTL.
| export const CloseSessionRequestBody = z.object({ | ||
| reason: z.string().max(256).optional(), | ||
| }); | ||
| export type CloseSessionRequestBody = z.infer<typeof CloseSessionRequestBody>; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm which field name the close route reads.
fd -t f 'api\.v1\.sessions\.\$session\.close\.ts$' | xargs -r cat
rg -nP '\b(closedReason|body\.reason)\b' -C2Repository: triggerdotdev/trigger.dev
Length of output: 6643
Rename reason to closedReason for API consistency.
The close request body uses reason, but the route explicitly maps it to the database field closedReason (line 53 of apps/webapp/app/routes/api.v1.sessions.$session.close.ts), and the response schema also uses closedReason. This asymmetry creates an awkward API surface where the request and response field names differ.
Recommended change
export const CloseSessionRequestBody = z.object({
- reason: z.string().max(256).optional(),
+ closedReason: z.string().max(256).optional(),
});🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/core/src/v3/schemas/api.ts` around lines 1556 - 1559, Rename the
CloseSessionRequestBody schema field from reason to closedReason to match the
route mapping and response shape: update the z.object in CloseSessionRequestBody
to use closedReason: z.string().max(256).optional(), keep the exported type
CloseSessionRequestBody = z.infer<typeof CloseSessionRequestBody> unchanged, and
run a quick grep for CloseSessionRequestBody usage to adjust any callers
expecting the old reason property.
What this enables
A new first-class primitive, Session, for durable bidirectional I/O that outlives a single run. Sessions give you a server-managed channel pair (
.outfrom the task,.infrom the client) that you can write to, read from, and subscribe to across many runs, filter, list, and close, all through a single identifier.Use cases unblocked
.in, the client writes to.in, and the server enforces no-writes-after-close..outafter the task finishes to replay the history.Public API surface
Control plane
POST /api/v1/sessionsto create. Idempotent when you supplyexternalId.GET /api/v1/sessions/:sessionto retrieve by friendlyId (session_abc) or by your ownexternalId. The server disambiguates via thesession_prefix.GET /api/v1/sessionsto list with filters (type,tag,taskIdentifier,externalId, derivedstatus= ACTIVE/CLOSED/EXPIRED, created-at period/from/to) and cursor pagination. Backed by ClickHouse.PATCH /api/v1/sessions/:sessionto update tags/metadata/externalId.POST /api/v1/sessions/:session/closeto terminate. Idempotent, hard-blocks new server-brokered writes.Realtime
PUT /realtime/v1/sessions/:session/:ioto initialize a channel. Returns S2 credentials in headers so clients can write direct to S2 for high-throughput cases.GET /realtime/v1/sessions/:session/:iofor SSE subscribe.POST /realtime/v1/sessions/:session/:io/appendfor server-side appends.Scopes
sessionsis now a ResourceType.read:sessions:{id},write:sessions:{id},admin:sessions:{id}all flow through the existing JWT validator.Implementation summary
Postgres (
Sessiontable)friendlyIdunique,(env, externalId)unique,expiresAt. List queries are served from ClickHouse, so Postgres stays insert-heavy.closedAt,closedReason,expiresAt) are write-once. No status enum, no counters, no currentRunId pointer. All run-related state is derived.ClickHouse (
sessions_v1)(org_id, project_id, environment_id, created_at, session_id).tagsindexed with a tokenbf_v1 skip index.SessionsReplicationServicemirrorsRunsReplicationServiceexactly: logical replication with leader-locked consumer,ConcurrentFlushScheduler, retry with exponential backoff + jitter, identical metric shape. Dedicated slot + publication so the two consume independently.SessionsRepository+ClickHouseSessionsRepositoryexpose list / count / tags with the same cursor pagination convention as runs and waitpoints.S2
sessions/{friendlyId}/{out|in}. The existingruns/{runId}/{streamId}format for implicit run streams is completely untouched.What did not change
streams.pipe/streams.inputstill behave exactly as before. They do not create Session rows and the existing routes are unchanged. Sessions are a net-new primitive for the next phase of agent features, not a reshaping of the current streams API.Verification
apps/webapp/test/sessionsReplicationService.test.tsexercises insert and update round-trips through Postgres logical replication into ClickHouse via testcontainers..out.initialize,.out.appendx2,.in.send,.out.subscribeover SSE, list (type, tag, status, externalId, pagination), close, idempotent re-close. Replicated row lands in ClickHouse within ~1s withclosed_reasonintact.Not in this PR
chat.agent).chat.agentintegration.Test plan
pnpm run typecheck --filter webapppnpm run test --filter webapp ./test/sessionsReplicationService.test.ts --runSESSION_REPLICATION_CLICKHOUSE_URLandSESSION_REPLICATION_ENABLED=1set. Confirm the slot and publication auto-create on boot.POST /api/v1/sessionsand verify the row replicates totrigger_dev.sessions_v1within a couple of seconds.POST /api/v1/sessions/:id/closeand confirm subsequentPOST /realtime/v1/sessions/:id/out/appendreturns 400.