-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat: Sessions - bidirectional durable agent streams #3417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
16ee28f
6f9dbe6
27fb4a4
829ccc4
95f3c00
4cadc19
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "@trigger.dev/core": patch | ||
| --- | ||
|
|
||
| Add `SessionId` friendly ID generator and schemas for the new durable Session primitive. Exported from `@trigger.dev/core/v3/isomorphic` alongside `RunId`, `BatchId`, etc. Ships the `CreateSessionStreamWaitpoint` request/response schemas alongside the main Session CRUD. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| --- | ||
| area: webapp | ||
| type: feature | ||
| --- | ||
|
|
||
| Add `Session` primitive — a durable, typed, bidirectional I/O primitive that outlives a single run, intended for agent/chat use cases. Ships the Postgres schema (`Session` table), control-plane CRUD routes (`POST/GET/PATCH /api/v1/sessions`, `POST /api/v1/sessions/:session/close` — polymorphic on friendlyId or externalId), `sessions` JWT scope, ClickHouse `sessions_v1` table, and `SessionsReplicationService` (logical replication from Postgres `Session` → ClickHouse `sessions_v1`). Run-scoped realtime streams (`streams.pipe`/`streams.input`) are unchanged and do **not** create Session rows. | ||
|
|
||
| Adds `POST /api/v1/runs/:runFriendlyId/session-streams/wait` (session-stream waitpoint creation) and wires `POST /realtime/v1/sessions/:session/:io/append` to fire any pending waitpoints on the channel. Gives `session.in` run-engine waitpoint semantics matching run-scoped input streams: a task can suspend while idle on a session channel and resume when an external client sends a record. Redis-backed pending-waitpoint set (`ssw:{sessionFriendlyId}:{io}`) is drained atomically on each append so multiple concurrent waiters (e.g. multi-tab chat) all resume together. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,165 @@ | ||
| import { json } from "@remix-run/server-runtime"; | ||
| import { | ||
| CreateSessionStreamWaitpointRequestBody, | ||
| type CreateSessionStreamWaitpointResponseBody, | ||
| } from "@trigger.dev/core/v3"; | ||
| import { WaitpointId } from "@trigger.dev/core/v3/isomorphic"; | ||
| import { z } from "zod"; | ||
| import { $replica } from "~/db.server"; | ||
| import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server"; | ||
| import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server"; | ||
| import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server"; | ||
| import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; | ||
| import { | ||
| addSessionStreamWaitpoint, | ||
| removeSessionStreamWaitpoint, | ||
| } from "~/services/sessionStreamWaitpointCache.server"; | ||
| import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; | ||
| import { parseDelay } from "~/utils/delays"; | ||
| import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; | ||
| import { engine } from "~/v3/runEngine.server"; | ||
| import { ServiceValidationError } from "~/v3/services/baseService.server"; | ||
|
|
||
| const ParamsSchema = z.object({ | ||
| runFriendlyId: z.string(), | ||
| }); | ||
|
|
||
| const { action, loader } = createActionApiRoute( | ||
| { | ||
| params: ParamsSchema, | ||
| body: CreateSessionStreamWaitpointRequestBody, | ||
| maxContentLength: 1024 * 10, // 10KB | ||
| method: "POST", | ||
| }, | ||
| async ({ authentication, body, params }) => { | ||
| try { | ||
| const run = await $replica.taskRun.findFirst({ | ||
| where: { | ||
| friendlyId: params.runFriendlyId, | ||
| runtimeEnvironmentId: authentication.environment.id, | ||
| }, | ||
| select: { | ||
| id: true, | ||
| friendlyId: true, | ||
| realtimeStreamsVersion: true, | ||
| }, | ||
| }); | ||
|
|
||
| if (!run) { | ||
| return json({ error: "Run not found" }, { status: 404 }); | ||
| } | ||
|
|
||
| const session = await resolveSessionByIdOrExternalId( | ||
| $replica, | ||
| authentication.environment.id, | ||
| body.session | ||
| ); | ||
|
|
||
| if (!session) { | ||
| return json({ error: "Session not found" }, { status: 404 }); | ||
| } | ||
|
|
||
| const idempotencyKeyExpiresAt = body.idempotencyKeyTTL | ||
| ? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL) | ||
| : undefined; | ||
|
|
||
| const timeout = await parseDelay(body.timeout); | ||
|
|
||
| const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags; | ||
|
|
||
| if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) { | ||
| throw new ServiceValidationError( | ||
| `Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.` | ||
| ); | ||
| } | ||
|
|
||
| if (bodyTags && bodyTags.length > 0) { | ||
| for (const tag of bodyTags) { | ||
| await createWaitpointTag({ | ||
| tag, | ||
| environmentId: authentication.environment.id, | ||
| projectId: authentication.environment.projectId, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| // Step 1: Create the waitpoint. | ||
| const result = await engine.createManualWaitpoint({ | ||
| environmentId: authentication.environment.id, | ||
| projectId: authentication.environment.projectId, | ||
| idempotencyKey: body.idempotencyKey, | ||
| idempotencyKeyExpiresAt, | ||
| timeout, | ||
| tags: bodyTags, | ||
| }); | ||
|
|
||
| // Step 2: Register the waitpoint on the session channel so the next | ||
| // append fires it. Keyed by (sessionFriendlyId, io) — both runs on a | ||
| // multi-tab session wake on the same record. | ||
| const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined; | ||
| await addSessionStreamWaitpoint( | ||
| session.friendlyId, | ||
| body.io, | ||
| result.waitpoint.id, | ||
| ttlMs && ttlMs > 0 ? ttlMs : undefined | ||
| ); | ||
|
|
||
| // Step 3: Race-check. If a record landed on the channel before this | ||
| // .wait() call, complete the waitpoint synchronously with that data | ||
| // and remove the pending registration. | ||
| if (!result.isCached) { | ||
| try { | ||
| const realtimeStream = getRealtimeStreamInstance( | ||
| authentication.environment, | ||
| run.realtimeStreamsVersion | ||
| ); | ||
|
|
||
| if (realtimeStream instanceof S2RealtimeStreams) { | ||
| const records = await realtimeStream.readSessionStreamRecords( | ||
| session.friendlyId, | ||
| body.io, | ||
| body.lastSeqNum | ||
| ); | ||
|
|
||
| if (records.length > 0) { | ||
| const record = records[0]!; | ||
|
|
||
| await engine.completeWaitpoint({ | ||
| id: result.waitpoint.id, | ||
| output: { | ||
| value: record.data, | ||
| type: "application/json", | ||
| isError: false, | ||
| }, | ||
| }); | ||
|
|
||
| await removeSessionStreamWaitpoint( | ||
| session.friendlyId, | ||
| body.io, | ||
| result.waitpoint.id | ||
| ); | ||
| } | ||
| } | ||
| } catch { | ||
| // Non-fatal: pending registration stays in Redis; the next append | ||
| // will complete the waitpoint via the append handler path. | ||
| } | ||
| } | ||
|
|
||
| return json<CreateSessionStreamWaitpointResponseBody>({ | ||
| waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id), | ||
| isCached: result.isCached, | ||
| }); | ||
| } catch (error) { | ||
| if (error instanceof ServiceValidationError) { | ||
| return json({ error: error.message }, { status: 422 }); | ||
| } else if (error instanceof Error) { | ||
| return json({ error: error.message }, { status: 500 }); | ||
| } | ||
|
|
||
| return json({ error: "Something went wrong" }, { status: 500 }); | ||
| } | ||
|
Comment on lines
+143
to
+161
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider logging the swallowed race-check error and not leaking raw error messages. Two related concerns in the error-handling tail:
♻️ Suggested fix- } catch {
- // Non-fatal: pending registration stays in Redis; the next append
- // will complete the waitpoint via the append handler path.
- }
+ } catch (error) {
+ // Non-fatal: pending registration stays in Redis; the next append
+ // will complete the waitpoint via the append handler path.
+ logger.warn("session-stream wait race-check failed", {
+ sessionFriendlyId: session.friendlyId,
+ io: body.io,
+ waitpointId: result.waitpoint.id,
+ error,
+ });
+ }
...
- } catch (error) {
- if (error instanceof ServiceValidationError) {
- return json({ error: error.message }, { status: 422 });
- } else if (error instanceof Error) {
- return json({ error: error.message }, { status: 500 });
- }
-
- return json({ error: "Something went wrong" }, { status: 500 });
- }
+ } catch (error) {
+ if (error instanceof ServiceValidationError) {
+ return json({ error: error.message }, { status: 422 });
+ }
+ logger.error("Failed to create session-stream waitpoint", { error });
+ return json({ error: "Something went wrong" }, { status: 500 });
+ }🤖 Prompt for AI Agents |
||
| } | ||
| ); | ||
|
|
||
| export { action, loader }; | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,61 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { json } from "@remix-run/server-runtime"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| CloseSessionRequestBody, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| type RetrieveSessionResponseBody, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } from "@trigger.dev/core/v3"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { z } from "zod"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { prisma } from "~/db.server"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| resolveSessionByIdOrExternalId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| serializeSession, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } from "~/services/realtime/sessions.server"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const ParamsSchema = z.object({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| session: z.string(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const { action } = createActionApiRoute( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| params: ParamsSchema, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| body: CloseSessionRequestBody, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| maxContentLength: 1024, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| method: "POST", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| allowJWT: true, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| corsStrategy: "all", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| authorization: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| action: "admin", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| resource: (params) => ({ sessions: params.session }), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| superScopes: ["admin:sessions", "admin:all", "admin"], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async ({ authentication, params, body }) => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const existing = await resolveSessionByIdOrExternalId( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| prisma, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| authentication.environment.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| params.session | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!existing) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return json({ error: "Session not found" }, { status: 404 }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Idempotent: if already closed, return the current row without clobbering | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // the original closedAt / closedReason. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (existing.closedAt) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return json<RetrieveSessionResponseBody>(serializeSession(existing)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const updated = await prisma.session.update({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| where: { id: existing.id }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| data: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| closedAt: new Date(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| closedReason: body.reason ?? null, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+45
to
+55
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: idempotent close has a TOCTOU — concurrent closes can overwrite Two concurrent ♻️ Suggested fix- const updated = await prisma.session.update({
- where: { id: existing.id },
- data: {
- closedAt: new Date(),
- closedReason: body.reason ?? null,
- },
- });
-
- return json<RetrieveSessionResponseBody>(serializeSession(updated));
+ const { count } = await prisma.session.updateMany({
+ where: { id: existing.id, closedAt: null },
+ data: {
+ closedAt: new Date(),
+ closedReason: body.reason ?? null,
+ },
+ });
+
+ const final =
+ count === 0
+ ? await prisma.session.findFirst({ where: { id: existing.id } })
+ : await prisma.session.findFirst({ where: { id: existing.id } });
+
+ if (!final) {
+ return json({ error: "Session not found" }, { status: 404 });
+ }
+
+ return json<RetrieveSessionResponseBody>(serializeSession(final));📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return json<RetrieveSessionResponseBody>(serializeSession(updated)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export { action }; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrap async
shutdownsignal handlers with.catchto avoid unhandled rejections.sessionsReplicationInstance.shutdownis async and can reject (e.g.,_replicationClient.stop()errors). Without a.catch, a rejection during SIGTERM/SIGINT becomes an unhandled promise rejection while the process is already tearing down. This is the pattern used elsewhere in the codebase (seeapps/webapp/app/v3/dynamicFlushScheduler.server.tsline 145-160).♻️ Proposed fix
🤖 Prompt for AI Agents