Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/session-primitive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add `SessionId` friendly ID generator and schemas for the new durable Session primitive. Exported from `@trigger.dev/core/v3/isomorphic` alongside `RunId`, `BatchId`, etc. Ships the `CreateSessionStreamWaitpoint` request/response schemas alongside the main Session CRUD.
8 changes: 8 additions & 0 deletions .server-changes/session-primitive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
area: webapp
type: feature
---

Add `Session` primitive — a durable, typed, bidirectional I/O primitive that outlives a single run, intended for agent/chat use cases. Ships the Postgres schema (`Session` table), control-plane CRUD routes (`POST/GET/PATCH /api/v1/sessions`, `POST /api/v1/sessions/:session/close` — polymorphic on friendlyId or externalId), `sessions` JWT scope, ClickHouse `sessions_v1` table, and `SessionsReplicationService` (logical replication from Postgres `Session` → ClickHouse `sessions_v1`). Run-scoped realtime streams (`streams.pipe`/`streams.input`) are unchanged and do **not** create Session rows.

Adds `POST /api/v1/runs/:runFriendlyId/session-streams/wait` (session-stream waitpoint creation) and wires `POST /realtime/v1/sessions/:session/:io/append` to fire any pending waitpoints on the channel. Gives `session.in` run-engine waitpoint semantics matching run-scoped input streams: a task can suspend while idle on a session channel and resume when an external client sends a record. Redis-backed pending-waitpoint set (`ssw:{sessionFriendlyId}:{io}`) is drained atomically on each append so multiple concurrent waiters (e.g. multi-tab chat) all resume together.
28 changes: 28 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,34 @@ import {
registerRunEngineEventBusHandlers,
setupBatchQueueCallbacks,
} from "./v3/runEngineHandlers.server";
import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server";
import { signalsEmitter } from "./services/signals.server";

// Start the sessions replication service (subscribes to the logical replication
// slot, runs leader election, flushes to ClickHouse). Done at entry level so it
// runs deterministically on webapp boot rather than lazily via a singleton
// reference elsewhere in the module graph.
if (sessionsReplicationInstance && env.SESSION_REPLICATION_ENABLED === "1") {
sessionsReplicationInstance
.start()
.then(() => {
console.log("🗃️ Sessions replication service started");
})
.catch((error) => {
console.error("🗃️ Sessions replication service failed to start", {
error,
});
});

signalsEmitter.on(
"SIGTERM",
sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
);
signalsEmitter.on(
"SIGINT",
sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
);
Comment on lines +45 to +52
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Wrap async shutdown signal handlers with .catch to avoid unhandled rejections.

sessionsReplicationInstance.shutdown is async and can reject (e.g., _replicationClient.stop() errors). Without a .catch, a rejection during SIGTERM/SIGINT becomes an unhandled promise rejection while the process is already tearing down. This is the pattern used elsewhere in the codebase (see apps/webapp/app/v3/dynamicFlushScheduler.server.ts line 145-160).

♻️ Proposed fix
-  signalsEmitter.on(
-    "SIGTERM",
-    sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
-  );
-  signalsEmitter.on(
-    "SIGINT",
-    sessionsReplicationInstance.shutdown.bind(sessionsReplicationInstance)
-  );
+  const shutdownSessionsReplication = () =>
+    sessionsReplicationInstance.shutdown().catch((error) => {
+      console.error("🗃️ Sessions replication service shutdown error", { error });
+    });
+  signalsEmitter.on("SIGTERM", shutdownSessionsReplication);
+  signalsEmitter.on("SIGINT", shutdownSessionsReplication);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/entry.server.tsx` around lines 45 - 52, The SIGTERM/SIGINT
handlers currently bind the async method sessionsReplicationInstance.shutdown
directly, which can produce unhandled promise rejections; change the handlers
registered via signalsEmitter.on("SIGTERM", ...) and signalsEmitter.on("SIGINT",
...) to call an async wrapper that invokes
sessionsReplicationInstance.shutdown() and attaches a .catch(...) to log or
swallow errors (mirror the pattern used in dynamicFlushScheduler.server for safe
async signal handling), ensuring any rejection from _replicationClient.stop() is
handled.

}

const ABORT_DELAY = 30000;

Expand Down
32 changes: 32 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,38 @@ const EnvironmentSchema = z
RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"),
RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING: z.string().default("0"),

// Session replication (Postgres → ClickHouse sessions_v1). Shares Redis
// with the runs replicator for leader locking but has its own slot and
// publication so the two consume independently.
SESSION_REPLICATION_CLICKHOUSE_URL: z.string().optional(),
SESSION_REPLICATION_ENABLED: z.string().default("0"),
SESSION_REPLICATION_SLOT_NAME: z.string().default("sessions_to_clickhouse_v1"),
SESSION_REPLICATION_PUBLICATION_NAME: z
.string()
.default("sessions_to_clickhouse_v1_publication"),
SESSION_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(1),
SESSION_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
SESSION_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100),
SESSION_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000),
SESSION_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000),
SESSION_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
SESSION_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
SESSION_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10),
SESSION_REPLICATION_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
SESSION_REPLICATION_CLICKHOUSE_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
SESSION_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
SESSION_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("0"),
SESSION_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
SESSION_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
SESSION_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"),
SESSION_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3),
SESSION_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100),
SESSION_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000),

// Clickhouse
CLICKHOUSE_URL: z.string(),
CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { json } from "@remix-run/server-runtime";
import {
CreateSessionStreamWaitpointRequestBody,
type CreateSessionStreamWaitpointResponseBody,
} from "@trigger.dev/core/v3";
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
import { z } from "zod";
import { $replica } from "~/db.server";
import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server";
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import {
addSessionStreamWaitpoint,
removeSessionStreamWaitpoint,
} from "~/services/sessionStreamWaitpointCache.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { parseDelay } from "~/utils/delays";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { engine } from "~/v3/runEngine.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";

const ParamsSchema = z.object({
runFriendlyId: z.string(),
});

const { action, loader } = createActionApiRoute(
{
params: ParamsSchema,
body: CreateSessionStreamWaitpointRequestBody,
maxContentLength: 1024 * 10, // 10KB
method: "POST",
},
async ({ authentication, body, params }) => {
try {
const run = await $replica.taskRun.findFirst({
where: {
friendlyId: params.runFriendlyId,
runtimeEnvironmentId: authentication.environment.id,
},
select: {
id: true,
friendlyId: true,
realtimeStreamsVersion: true,
},
});

if (!run) {
return json({ error: "Run not found" }, { status: 404 });
}

const session = await resolveSessionByIdOrExternalId(
$replica,
authentication.environment.id,
body.session
);

if (!session) {
return json({ error: "Session not found" }, { status: 404 });
}

const idempotencyKeyExpiresAt = body.idempotencyKeyTTL
? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL)
: undefined;

const timeout = await parseDelay(body.timeout);

const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags;

if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) {
throw new ServiceValidationError(
`Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.`
);
}

if (bodyTags && bodyTags.length > 0) {
for (const tag of bodyTags) {
await createWaitpointTag({
tag,
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
});
}
}

// Step 1: Create the waitpoint.
const result = await engine.createManualWaitpoint({
environmentId: authentication.environment.id,
projectId: authentication.environment.projectId,
idempotencyKey: body.idempotencyKey,
idempotencyKeyExpiresAt,
timeout,
tags: bodyTags,
});

// Step 2: Register the waitpoint on the session channel so the next
// append fires it. Keyed by (sessionFriendlyId, io) — both runs on a
// multi-tab session wake on the same record.
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
await addSessionStreamWaitpoint(
session.friendlyId,
body.io,
result.waitpoint.id,
ttlMs && ttlMs > 0 ? ttlMs : undefined
);

// Step 3: Race-check. If a record landed on the channel before this
// .wait() call, complete the waitpoint synchronously with that data
// and remove the pending registration.
if (!result.isCached) {
try {
const realtimeStream = getRealtimeStreamInstance(
authentication.environment,
run.realtimeStreamsVersion
);

if (realtimeStream instanceof S2RealtimeStreams) {
const records = await realtimeStream.readSessionStreamRecords(
session.friendlyId,
body.io,
body.lastSeqNum
);

if (records.length > 0) {
const record = records[0]!;

await engine.completeWaitpoint({
id: result.waitpoint.id,
output: {
value: record.data,
type: "application/json",
isError: false,
},
});

await removeSessionStreamWaitpoint(
session.friendlyId,
body.io,
result.waitpoint.id
);
}
}
} catch {
// Non-fatal: pending registration stays in Redis; the next append
// will complete the waitpoint via the append handler path.
}
}

return json<CreateSessionStreamWaitpointResponseBody>({
waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id),
isCached: result.isCached,
});
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
return json({ error: error.message }, { status: 500 });
}

return json({ error: "Something went wrong" }, { status: 500 });
}
Comment on lines +143 to +161
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Consider logging the swallowed race-check error and not leaking raw error messages.

Two related concerns in the error-handling tail:

  1. Line 143-146: the catch {} silently drops any failure from the race-check (S2 read / engine complete / cache remove). If this path has latent bugs you'll only notice via timeouts, since nothing is recorded. Please at least logger.warn/logger.error with sessionFriendlyId, io, and waitpointId so ops can tell cached-drain-on-append from broken race-checks.
  2. Line 156-158: return json({ error: error.message }, { status: 500 }) forwards arbitrary internal error messages (e.g., Prisma, engine, S2) to the client. Per the PR commit message "log and swallow unexpected internal errors instead of returning raw Prisma/internal messages that could leak details", this route should follow the same pattern — log the error server-side and return a generic 500 message.
♻️ Suggested fix
-        } catch {
-          // Non-fatal: pending registration stays in Redis; the next append
-          // will complete the waitpoint via the append handler path.
-        }
+        } catch (error) {
+          // Non-fatal: pending registration stays in Redis; the next append
+          // will complete the waitpoint via the append handler path.
+          logger.warn("session-stream wait race-check failed", {
+            sessionFriendlyId: session.friendlyId,
+            io: body.io,
+            waitpointId: result.waitpoint.id,
+            error,
+          });
+        }
...
-    } catch (error) {
-      if (error instanceof ServiceValidationError) {
-        return json({ error: error.message }, { status: 422 });
-      } else if (error instanceof Error) {
-        return json({ error: error.message }, { status: 500 });
-      }
-
-      return json({ error: "Something went wrong" }, { status: 500 });
-    }
+    } catch (error) {
+      if (error instanceof ServiceValidationError) {
+        return json({ error: error.message }, { status: 422 });
+      }
+      logger.error("Failed to create session-stream waitpoint", { error });
+      return json({ error: "Something went wrong" }, { status: 500 });
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/routes/api.v1.runs`.$runFriendlyId.session-streams.wait.ts
around lines 143 - 161, The empty catch after the race-check should not swallow
errors — replace it with a logger.warn/error that includes the context
(sessionFriendlyId, io, and WaitpointId.toFriendlyId(result.waitpoint.id) or
waitpointId variable) and the caught error so ops can distinguish cached-drain
vs broken race-checks; and in the outer catch branch that currently does `return
json({ error: error.message }, { status: 500 })` do not return raw internal
messages — log the error (with sessionFriendlyId/io/waitpointId/context) and
return a generic 500 response like `{ error: "Something went wrong" }` while
preserving the existing 422 handling for ServiceValidationError.

}
);

export { action, loader };
61 changes: 61 additions & 0 deletions apps/webapp/app/routes/api.v1.sessions.$session.close.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { json } from "@remix-run/server-runtime";
import {
CloseSessionRequestBody,
type RetrieveSessionResponseBody,
} from "@trigger.dev/core/v3";
import { z } from "zod";
import { prisma } from "~/db.server";
import {
resolveSessionByIdOrExternalId,
serializeSession,
} from "~/services/realtime/sessions.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
session: z.string(),
});

const { action } = createActionApiRoute(
{
params: ParamsSchema,
body: CloseSessionRequestBody,
maxContentLength: 1024,
method: "POST",
allowJWT: true,
corsStrategy: "all",
authorization: {
action: "admin",
resource: (params) => ({ sessions: params.session }),
superScopes: ["admin:sessions", "admin:all", "admin"],
},
},
async ({ authentication, params, body }) => {
const existing = await resolveSessionByIdOrExternalId(
prisma,
authentication.environment.id,
params.session
);

if (!existing) {
return json({ error: "Session not found" }, { status: 404 });
}

// Idempotent: if already closed, return the current row without clobbering
// the original closedAt / closedReason.
if (existing.closedAt) {
return json<RetrieveSessionResponseBody>(serializeSession(existing));
}

const updated = await prisma.session.update({
where: { id: existing.id },
data: {
closedAt: new Date(),
closedReason: body.reason ?? null,
},
});
Comment on lines +45 to +55
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor: idempotent close has a TOCTOU — concurrent closes can overwrite closedAt/closedReason.

Two concurrent POST /close calls can both observe existing.closedAt == null and both run the update, so the second overwrites the first's closedAt timestamp and closedReason. Given the PR description states terminal markers are "write-once, never flipped back", you can enforce that at the DB level by scoping the update with closedAt: null and falling back to re-reading when the update affects zero rows.

♻️ Suggested fix
-    const updated = await prisma.session.update({
-      where: { id: existing.id },
-      data: {
-        closedAt: new Date(),
-        closedReason: body.reason ?? null,
-      },
-    });
-
-    return json<RetrieveSessionResponseBody>(serializeSession(updated));
+    const { count } = await prisma.session.updateMany({
+      where: { id: existing.id, closedAt: null },
+      data: {
+        closedAt: new Date(),
+        closedReason: body.reason ?? null,
+      },
+    });
+
+    const final =
+      count === 0
+        ? await prisma.session.findFirst({ where: { id: existing.id } })
+        : await prisma.session.findFirst({ where: { id: existing.id } });
+
+    if (!final) {
+      return json({ error: "Session not found" }, { status: 404 });
+    }
+
+    return json<RetrieveSessionResponseBody>(serializeSession(final));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (existing.closedAt) {
return json<RetrieveSessionResponseBody>(serializeSession(existing));
}
const updated = await prisma.session.update({
where: { id: existing.id },
data: {
closedAt: new Date(),
closedReason: body.reason ?? null,
},
});
if (existing.closedAt) {
return json<RetrieveSessionResponseBody>(serializeSession(existing));
}
const { count } = await prisma.session.updateMany({
where: { id: existing.id, closedAt: null },
data: {
closedAt: new Date(),
closedReason: body.reason ?? null,
},
});
const final = await prisma.session.findFirst({ where: { id: existing.id } });
if (!final) {
return json({ error: "Session not found" }, { status: 404 });
}
return json<RetrieveSessionResponseBody>(serializeSession(final));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/webapp/app/routes/api.v1.sessions`.$session.close.ts around lines 45 -
55, The close endpoint currently does a read-then-update which has a TOCTOU
race: when updating the session in the block that calls prisma.session.update
(using existing.id and setting closedAt/closedReason), change the update to be
conditional by adding closedAt: null to the where clause so the DB only writes
if not already closed; after the conditional update, if no row was affected
(update returned null or zero), re-query the session (the
existing/serializeSession path) and return that value to ensure idempotent,
write-once behavior for closedAt/closedReason.


return json<RetrieveSessionResponseBody>(serializeSession(updated));
}
);

export { action };
Loading
Loading