Skip to content

fix: handle fast-completion race in batch streaming seal check#3427

Open
matt-aitken wants to merge 2 commits intomainfrom
tri-8700-batch-fast-completion-race
Open

fix: handle fast-completion race in batch streaming seal check#3427
matt-aitken wants to merge 2 commits intomainfrom
tri-8700-batch-fast-completion-race

Conversation

@matt-aitken
Copy link
Copy Markdown
Member

Problem

When batchTrigger() is called with large payloads, each item's payload is uploaded to R2 server-side during the streaming loop before being enqueued. This makes the loop slow — around 3 seconds per item. Workers pick up and execute each item as it's enqueued, running concurrently with the ongoing stream.

For the last item in the batch, a race exists between the streaming loop finishing and the batch completion cleanup:

  1. The loop enqueues the last item and returns from enqueueBatchItem()
  2. A waiting worker picks up the item almost instantly and executes it
  3. recordSuccess() fires, processedCount hits the expected total, finalizeBatch() runs
  4. cleanup() deletes all Redis keys for the batch, including enqueuedItemsKey
  5. The streaming loop exits and calls getBatchEnqueuedCount() — reads the now-deleted key — returns 0

The count check finds enqueuedCount (0) !== batch.runCount, falls through to a Postgres fallback, but the fallback only checked sealed. The BatchQueue completion path sets status = COMPLETED in Postgres without setting sealed = true (that's the streaming endpoint's job), so the fallback misses it too.

This causes the endpoint to return sealed: false. The SDK treats this as retryable and retries up to 5 times with exponential backoff. Each retry calls enqueueBatchItem(), which reads the batch meta key from Redis — also deleted by cleanup() — and throws "Batch not found or not initialized" (500). The final retry gets a 422 because the batch is already COMPLETED, which the SDK does not retry, causing an ApiError to be thrown from await batchTrigger() in the parent run — even though all child runs completed successfully.

Fix

In the Postgres fallback inside StreamBatchItemsService, also check status === "COMPLETED" alongside sealed. This covers the fast-completion path where the BatchQueue finishes all runs before the streaming endpoint gets to seal the batch normally.

Also switches findUnique to findFirst per webapp convention.

When all batch runs complete before getBatchEnqueuedCount() is called,
cleanup() has already deleted the enqueuedItemsKey in Redis, causing it
to return 0. The existing Postgres fallback only checked sealed, but the
BatchQueue completion path sets status=COMPLETED without setting
sealed=true. Add the status check so the endpoint returns sealed:true
instead of triggering SDK retries into a dead BatchQueue.

Also switch findUnique to findFirst per webapp convention.
@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented Apr 22, 2026

⚠️ No Changeset found

Latest commit: ad29045

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 22, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: aec7a339-6f8b-41b3-8169-12590a1e2298

📥 Commits

Reviewing files that changed from the base of the PR and between 43e8543 and ad29045.

📒 Files selected for processing (2)
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
  • apps/webapp/test/engine/streamBatchItems.test.ts
📜 Recent review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (28)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: sdk-compat / Bun Runtime
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: sdk-compat / Node.js 22.12 (ubuntu-latest)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: sdk-compat / Node.js 20.20 (ubuntu-latest)
  • GitHub Check: sdk-compat / Cloudflare Workers
  • GitHub Check: sdk-compat / Deno Runtime
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: Analyze (javascript-typescript)
🧰 Additional context used
📓 Path-based instructions (12)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use zod for validation in packages/core and apps/webapp

Files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Add crumbs as you write code using // @Crumbs comments or `// `#region` `@crumbs blocks. These are temporary debug instrumentation and must be stripped using agentcrumbs strip before merge.

Files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
**/*.{test,spec}.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use vitest for all tests in the Trigger.dev repository

Files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)

**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries

Files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier before committing

Files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
**/*.test.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (AGENTS.md)

**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptive describe and it blocks
Tests should avoid mocks or stubs and use the helpers from @internal/testcontainers when Redis or Postgres are needed
Use vitest for running unit tests

Files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
**/*.test.{ts,tsx}

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.test.{ts,tsx}: Use vitest exclusively for testing. Never mock anything — use testcontainers instead.
Place test files next to source files with naming pattern MyService.ts -> MyService.test.ts.
For Redis/PostgreSQL tests in vitest, use testcontainers helpers: redisTest, postgresTest, or containerTest imported from @internal/testcontainers.

Files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
**/*.ts{,x}

📄 CodeRabbit inference engine (CLAUDE.md)

Always import from @trigger.dev/sdk when writing Trigger.dev tasks. Never use @trigger.dev/sdk/v3 or deprecated client.defineJob.

Files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: Access environment variables through the env export of env.server.ts instead of directly accessing process.env
Use subpath exports from @trigger.dev/core package instead of importing from the root @trigger.dev/core path

Use named constants for sentinel/placeholder values (e.g. const UNSET_VALUE = '__unset__') instead of raw string literals scattered across comparisons

Files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
apps/webapp/**/*.test.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

Do not import env.server.ts directly or indirectly into test files; instead pass environment-dependent values through options/parameters to make code testable

For testable code, never import env.server.ts in test files. Pass configuration as options instead (e.g., realtimeClient.server.ts takes config as constructor arg, realtimeClientGlobal.server.ts creates singleton with env config)

Files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
apps/webapp/**/*.server.ts

📄 CodeRabbit inference engine (apps/webapp/CLAUDE.md)

apps/webapp/**/*.server.ts: Never use request.signal for detecting client disconnects. Use getRequestAbortSignal() from app/services/httpAsyncStorage.server.ts instead, which is wired directly to Express res.on('close') and fires reliably
Access environment variables via env export from app/env.server.ts. Never use process.env directly
Always use findFirst instead of findUnique in Prisma queries. findUnique has an implicit DataLoader that batches concurrent calls and has active bugs even in Prisma 6.x (uppercase UUIDs returning null, composite key SQL correctness issues, 5-10x worse performance). findFirst is never batched and avoids this entire class of issues

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
🧠 Learnings (23)
📓 Common learnings
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3368
File: internal-packages/database/prisma/schema.prisma:666-666
Timestamp: 2026-04-16T14:21:18.496Z
Learning: In `triggerdotdev/trigger.dev`, the `BackgroundWorkerTask` covering index on `(runtimeEnvironmentId, slug, triggerSource)` lives in `internal-packages/database/prisma/migrations/20260413000000_add_bwt_covering_index/migration.sql` as a `CREATE INDEX CONCURRENTLY IF NOT EXISTS`, intentionally in its own migration file separate from the `TaskIdentifier` table migration. Do not flag this index as missing from the schema migrations in future reviews.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3333
File: apps/webapp/app/services/runsReplicationService.server.ts:655-685
Timestamp: 2026-04-17T13:20:14.259Z
Learning: In `apps/webapp/app/services/runsReplicationService.server.ts`, the per-ClickHouse-group inserts inside `#flushBatch` are intentionally serialized (sequential) by default. Parallelizing group flushes causes Linux socket write-buffer pressure that required kernel tuning (`net.ipv4.tcp_wmem` set to `4096 20480 8388608`). Parallel flushing may be exposed as an opt-in via a `groupFlushStrategy` env var/option, but sequential must remain the safe default. Do not flag the sequential loop as a performance issue without acknowledging this constraint.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3417
File: apps/webapp/app/services/sessionsReplicationService.server.ts:224-231
Timestamp: 2026-04-20T14:50:21.818Z
Learning: In `apps/webapp/app/services/sessionsReplicationService.server.ts`, the acknowledge-before-flush pattern is intentional and mirrors `runsReplicationService.server.ts`. `_latestCommitEndLsn` is updated at Postgres commit time and acknowledged on a periodic interval via `#acknowledgeLatestTransaction`, without waiting for ClickHouse batch flush to complete. Do not flag this as a durability/ordering issue — this at-least-once delivery trade-off is an established project-wide convention for both runs and sessions replication services.
📚 Learning: 2026-03-03T13:07:33.177Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: internal-packages/run-engine/src/batch-queue/tests/index.test.ts:711-713
Timestamp: 2026-03-03T13:07:33.177Z
Learning: In `internal-packages/run-engine/src/batch-queue/tests/index.test.ts`, test assertions for rate limiter stubs can use `toBeGreaterThanOrEqual` rather than exact equality (`toBe`) because the consumer loop may call the rate limiter during empty pops in addition to actual item processing, and this over-calling is acceptable in integration tests.

Applied to files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
📚 Learning: 2026-04-13T21:44:00.032Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3368
File: apps/webapp/app/services/taskIdentifierRegistry.server.ts:24-67
Timestamp: 2026-04-13T21:44:00.032Z
Learning: In `apps/webapp/app/services/taskIdentifierRegistry.server.ts`, the sequential upsert/updateMany/findMany writes in `syncTaskIdentifiers` are intentionally NOT wrapped in a Prisma transaction. This function runs only during deployment-change events (low-concurrency path), and any partial `isInLatestDeployment` state is acceptable because it self-corrects on the next deployment. Do not flag this as a missing-transaction/atomicity issue in future reviews.

Applied to files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-07T14:12:18.946Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3331
File: apps/webapp/test/engine/batchPayloads.test.ts:5-24
Timestamp: 2026-04-07T14:12:18.946Z
Learning: In `apps/webapp/test/engine/batchPayloads.test.ts`, using `vi.mock` for `~/v3/objectStore.server` (stubbing `hasObjectStoreClient` and `uploadPacketToObjectStore`), `~/env.server` (overriding offload thresholds), and `~/v3/tracer.server` (stubbing `startActiveSpan`) is intentional and acceptable. Simulating controlled transient upload failures (e.g., fail N times then succeed) to verify `p-retry` behavior cannot be reproduced with real services or testcontainers. This file is an explicit exception to the repo's general no-mocks policy.

Applied to files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
📚 Learning: 2026-04-16T13:45:22.317Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3368
File: apps/webapp/test/engine/taskIdentifierRegistry.test.ts:3-19
Timestamp: 2026-04-16T13:45:22.317Z
Learning: In `apps/webapp/test/engine/taskIdentifierRegistry.test.ts`, the `vi.mock` calls for `~/services/taskIdentifierCache.server` (stubbing `getTaskIdentifiersFromCache` and `populateTaskIdentifierCache`), `~/models/task.server` (stubbing `getAllTaskIdentifiers`), and `~/db.server` (stubbing `prisma` and `$replica`) are intentional. The suite uses real Postgres via testcontainers for all `TaskIdentifier` DB operations, but isolates the Redis cache layer and legacy query fallback as separate concerns not exercised in this test file. Do not flag these mocks as violations of the no-mocks policy in future reviews.

Applied to files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
📚 Learning: 2026-03-02T12:43:25.254Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: internal-packages/run-engine/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:25.254Z
Learning: Applies to internal-packages/run-engine/src/engine/tests/**/*.test.ts : Implement tests for RunEngine in `src/engine/tests/` using testcontainers for Redis and PostgreSQL containerization

Applied to files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
📚 Learning: 2026-04-16T14:19:16.330Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: apps/webapp/CLAUDE.md:0-0
Timestamp: 2026-04-16T14:19:16.330Z
Learning: Applies to apps/webapp/app/v3/services/{cancelTaskRun,batchTriggerV3}.server.ts : When editing services that branch on `RunEngineVersion` to support both V1 and V2 (e.g., `cancelTaskRun.server.ts`, `batchTriggerV3.server.ts`), only modify V2 code paths

Applied to files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-17T13:20:14.259Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3333
File: apps/webapp/app/services/runsReplicationService.server.ts:655-685
Timestamp: 2026-04-17T13:20:14.259Z
Learning: In `apps/webapp/app/services/runsReplicationService.server.ts`, the per-ClickHouse-group inserts inside `#flushBatch` are intentionally serialized (sequential) by default. Parallelizing group flushes causes Linux socket write-buffer pressure that required kernel tuning (`net.ipv4.tcp_wmem` set to `4096 20480 8388608`). Parallel flushing may be exposed as an opt-in via a `groupFlushStrategy` env var/option, but sequential must remain the safe default. Do not flag the sequential loop as a performance issue without acknowledging this constraint.

Applied to files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-07T14:12:59.018Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3331
File: apps/webapp/app/runEngine/concerns/batchPayloads.server.ts:112-136
Timestamp: 2026-04-07T14:12:59.018Z
Learning: In `apps/webapp/app/runEngine/concerns/batchPayloads.server.ts`, the `pRetry` call wrapping `uploadPacketToObjectStore` intentionally retries **all** error types (no `shouldRetry` filter / `AbortError` guards). The maintainer explicitly prefers over-retrying to under-retrying because multiple heterogeneous object store backends are supported and it is impractical to enumerate all permanent error signatures. Do not flag this as an issue in future reviews.

Applied to files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2025-07-12T18:06:04.133Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.

Applied to files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
📚 Learning: 2026-03-03T13:08:03.862Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: packages/redis-worker/src/fair-queue/index.ts:1114-1121
Timestamp: 2026-03-03T13:08:03.862Z
Learning: In packages/redis-worker/src/fair-queue/index.ts, it's acceptable for the worker queue depth cap check to allow overshooting by up to batchClaimSize messages per iteration, as the next iteration will recheck and prevent sustained growth beyond the limit.

Applied to files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-22T13:26:12.060Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3244
File: apps/webapp/app/components/code/TextEditor.tsx:81-86
Timestamp: 2026-03-22T13:26:12.060Z
Learning: In the triggerdotdev/trigger.dev codebase, do not flag `navigator.clipboard.writeText(...)` calls for `missing-await`/`unhandled-promise` issues. These clipboard writes are intentionally invoked without `await` and without `catch` handlers across the project; keep that behavior consistent when reviewing TypeScript/TSX files (e.g., usages like in `apps/webapp/app/components/code/TextEditor.tsx`).

Applied to files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-22T19:24:14.403Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3187
File: apps/webapp/app/v3/services/alerts/deliverErrorGroupAlert.server.ts:200-204
Timestamp: 2026-03-22T19:24:14.403Z
Learning: In the triggerdotdev/trigger.dev codebase, webhook URLs are not expected to contain embedded credentials/secrets (e.g., fields like `ProjectAlertWebhookProperties` should only hold credential-free webhook endpoints). During code review, if you see logging or inclusion of raw webhook URLs in error messages, do not automatically treat it as a credential-leak/secrets-in-logs issue by default—first verify the URL does not contain embedded credentials (for example, no username/password in the URL, no obvious secret/token query params or fragments). If the URL is credential-free per this project’s conventions, allow the logging.

Applied to files:

  • apps/webapp/test/engine/streamBatchItems.test.ts
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-16T14:19:16.330Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: apps/webapp/CLAUDE.md:0-0
Timestamp: 2026-04-16T14:19:16.330Z
Learning: Applies to apps/webapp/{app/v3/services/triggerTask.server.ts,app/v3/services/batchTriggerV3.server.ts} : In `triggerTask.server.ts` and `batchTriggerV3.server.ts`, do NOT add database queries. Task defaults (TTL, etc.) are resolved via `backgroundWorkerTask.findFirst()` in the queue concern (`queues.server.ts`). Piggyback on the existing query instead of adding new ones

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-20T15:08:55.358Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3417
File: apps/webapp/app/services/sessionsReplicationService.server.ts:204-215
Timestamp: 2026-04-20T15:08:55.358Z
Learning: In `apps/webapp/app/services/sessionsReplicationService.server.ts` and `apps/webapp/app/services/runsReplicationService.server.ts`, the `getKey` function in `ConcurrentFlushScheduler` uses `${item.event}_${item.session.id}` / `${item.event}_${item.run.id}` respectively. This pattern is intentionally kept identical across both replication services for consistency. Any change to the deduplication key shape (e.g., keying solely by session/run id) must be applied to both services together, never to one service in isolation. Tracking as a cross-service follow-up.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-20T15:06:11.054Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3417
File: apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts:16-26
Timestamp: 2026-04-20T15:06:11.054Z
Learning: In `apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts` and `apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts`, the `MAX_APPEND_BODY_BYTES` cap of 512 KiB (1024 * 512) is intentional even though `appendPart` wraps the body in JSON (which could expand quote-heavy payloads beyond S2's 1 MiB per-record limit). The maintainer considers worst-case quote-heavy payloads pathological and not realistic. If S2 rejections occur in practice, an encoded-size guard will be added inside `appendPart` rather than lowering the raw body cap on every caller. Do not flag this as an issue in future reviews.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-16T14:19:16.330Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: apps/webapp/CLAUDE.md:0-0
Timestamp: 2026-04-16T14:19:16.330Z
Learning: Applies to apps/webapp/app/v3/services/queues.server.ts : If adding a new task-level default, add it to the existing `select` clause in the `backgroundWorkerTask.findFirst()` query in `queues.server.ts` — do NOT add a second query. If the default doesn't need to be known at trigger time, resolve it at dequeue time instead

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-16T13:24:09.546Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3399
File: apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts:26-42
Timestamp: 2026-04-16T13:24:09.546Z
Learning: In `apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts`, `RedisRealtimeStreams` is only ever instantiated once as a process-wide singleton via `singleton("realtimeStreams", initializeRedisRealtimeStreams)` in `apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts` (line 30). Therefore, the instance-level `_sharedRedis` field and `sharedRedis` getter are effectively process-scoped. Do not flag them as a per-request connection leak. The v2 streaming path uses a completely separate class (`S2RealtimeStreams`).

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-20T15:08:59.789Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3417
File: apps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.ts:27-40
Timestamp: 2026-04-20T15:08:59.789Z
Learning: In `apps/webapp/app/services/sessionsRepository/clickhouseSessionsRepository.server.ts`, the cursor predicate in `listSessionIds` compares only `session_id` while the `ORDER BY` clause uses `(created_at, session_id)`. This is intentional and consistent with the same pattern in `ClickHouseRunsRepository` and the waitpoints repository. Do not flag this as a skip/duplicate pagination bug in isolation — any fix must land across all three repositories at once as a shared follow-up.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-16T14:07:46.808Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3399
File: apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts:282-291
Timestamp: 2026-04-16T14:07:46.808Z
Learning: In `apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts` (`streamResponse`), the pattern `signal.addEventListener("abort", cleanup, { once: true })` does NOT need an explicit `removeEventListener` call in the non-abort cleanup paths (inactivity, cancel). The `AbortController` is per-request, scoped to `httpAsyncStorage` (created in `apps/webapp/server.ts` per-request middleware), so it gets GC'd when the request ends — taking the listener and closure with it. The `isCleanedUp` guard prevents double-execution, and `redis.disconnect()` is called before the request ends. Do not flag this as a listener/closure leak.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-02T12:43:43.173Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: packages/redis-worker/CLAUDE.md:0-0
Timestamp: 2026-03-02T12:43:43.173Z
Learning: Applies to packages/redis-worker/**/redis-worker/src/worker.ts : Worker loop and job processing should implement concurrency control in src/worker.ts

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-20T14:50:21.818Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3417
File: apps/webapp/app/services/sessionsReplicationService.server.ts:224-231
Timestamp: 2026-04-20T14:50:21.818Z
Learning: In `apps/webapp/app/services/sessionsReplicationService.server.ts`, the acknowledge-before-flush pattern is intentional and mirrors `runsReplicationService.server.ts`. `_latestCommitEndLsn` is updated at Postgres commit time and acknowledged on a periodic interval via `#acknowledgeLatestTransaction`, without waiting for ClickHouse batch flush to complete. Do not flag this as a durability/ordering issue — this at-least-once delivery trade-off is an established project-wide convention for both runs and sessions replication services.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-16T14:19:16.330Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: apps/webapp/CLAUDE.md:0-0
Timestamp: 2026-04-16T14:19:16.330Z
Learning: Applies to apps/webapp/**/*.server.ts : Always use `findFirst` instead of `findUnique` in Prisma queries. `findUnique` has an implicit DataLoader that batches concurrent calls and has active bugs even in Prisma 6.x (uppercase UUIDs returning null, composite key SQL correctness issues, 5-10x worse performance). `findFirst` is never batched and avoids this entire class of issues

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
🔇 Additional comments (3)
apps/webapp/app/runEngine/services/streamBatchItems.server.ts (2)

213-264: LGTM — fast-completion fallback correctly handled.

Accepting status === "COMPLETED" in the enqueued-count mismatch branch (with the findUniquefindFirst switch) cleanly covers the case where BatchQueue.cleanup() deleted enqueuedItemsKey before getBatchEnqueuedCount() ran. The updated comment makes the timing window clear.

As per coding guidelines "Always use findFirst instead of findUnique in Prisma queries." → compliant.


283-330: LGTM — post-seal race now accepts COMPLETED as terminal.

The broadened condition (sealed && status === "PROCESSING") || status === "COMPLETED" correctly absorbs the window between tryCompleteBatch (which sets status=COMPLETED but not sealed=true) and cleanup() without incorrectly masking genuine error states like ABORTED (still exercised by the existing "unexpected state" test). The added batchStatus / batchSealed logging will make future diagnoses much easier.

As per coding guidelines "Always use findFirst instead of findUnique in Prisma queries." → compliant.

apps/webapp/test/engine/streamBatchItems.test.ts (1)

387-509: LGTM — test faithfully reproduces the fast-completion race.

The racingPrisma wrapper models the real scenario precisely: tryCompleteBatch flipping status to "COMPLETED" without touching sealed, causing the conditional updateMany(where: status="PENDING") to return count: 0. The final DB assertion that sealed remains false while the service still returns sealed: true is the important invariant, since the BatchQueue completion path intentionally never sets sealed=true. Good coverage for the fix.


Walkthrough

When the observed enqueuedCount differs from batch.runCount, the endpoint now re-queries the batch with findFirst and treats the batch as already handled if sealed is true or status is "COMPLETED". If an updateMany attempt to seal loses the race (returns count 0), the code re-queries and accepts COMPLETED as a terminal condition even when sealed remains false. An inline comment clarifies a Redis cleanup timing window where enqueuedItemsKey may be deleted before the count read. Logging was augmented to include batchStatus and batchSealed for these concurrent outcomes.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main change: handling a race condition in batch streaming seal check when completion happens quickly.
Description check ✅ Passed The description provides comprehensive context about the problem, the race condition scenario, and the fix applied, all directly related to the changeset.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch tri-8700-batch-fast-completion-race

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

devin-ai-integration[bot]

This comment was marked as resolved.

The second race check at the seal updateMany only accepted
sealed=true && status=PROCESSING. But the BatchQueue completion path
can set status=COMPLETED (without sealed=true) between getEnqueuedCount
and the seal updateMany, causing the check to reject a legitimate
success state and throw ServiceValidationError.

Also switch the post-seal re-query from findUnique to findFirst per
webapp convention.

Co-Authored-By: Matt Aitken <matt@mattaitken.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants