Skip to content

fix: handle fast-completion race in batch streaming seal check#3427

Open
matt-aitken wants to merge 1 commit intomainfrom
tri-8700-batch-fast-completion-race
Open

fix: handle fast-completion race in batch streaming seal check#3427
matt-aitken wants to merge 1 commit intomainfrom
tri-8700-batch-fast-completion-race

Conversation

@matt-aitken
Copy link
Copy Markdown
Member

Problem

When batchTrigger() is called with large payloads, each item's payload is uploaded to R2 server-side during the streaming loop before being enqueued. This makes the loop slow — around 3 seconds per item. Workers pick up and execute each item as it's enqueued, running concurrently with the ongoing stream.

For the last item in the batch, a race exists between the streaming loop finishing and the batch completion cleanup:

  1. The loop enqueues the last item and returns from enqueueBatchItem()
  2. A waiting worker picks up the item almost instantly and executes it
  3. recordSuccess() fires, processedCount hits the expected total, finalizeBatch() runs
  4. cleanup() deletes all Redis keys for the batch, including enqueuedItemsKey
  5. The streaming loop exits and calls getBatchEnqueuedCount() — reads the now-deleted key — returns 0

The count check finds enqueuedCount (0) !== batch.runCount, falls through to a Postgres fallback, but the fallback only checked sealed. The BatchQueue completion path sets status = COMPLETED in Postgres without setting sealed = true (that's the streaming endpoint's job), so the fallback misses it too.

This causes the endpoint to return sealed: false. The SDK treats this as retryable and retries up to 5 times with exponential backoff. Each retry calls enqueueBatchItem(), which reads the batch meta key from Redis — also deleted by cleanup() — and throws "Batch not found or not initialized" (500). The final retry gets a 422 because the batch is already COMPLETED, which the SDK does not retry, causing an ApiError to be thrown from await batchTrigger() in the parent run — even though all child runs completed successfully.

Fix

In the Postgres fallback inside StreamBatchItemsService, also check status === "COMPLETED" alongside sealed. This covers the fast-completion path where the BatchQueue finishes all runs before the streaming endpoint gets to seal the batch normally.

Also switches findUnique to findFirst per webapp convention.

When all batch runs complete before getBatchEnqueuedCount() is called,
cleanup() has already deleted the enqueuedItemsKey in Redis, causing it
to return 0. The existing Postgres fallback only checked sealed, but the
BatchQueue completion path sets status=COMPLETED without setting
sealed=true. Add the status check so the endpoint returns sealed:true
instead of triggering SDK retries into a dead BatchQueue.

Also switch findUnique to findFirst per webapp convention.
@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented Apr 22, 2026

⚠️ No Changeset found

Latest commit: 43e8543

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 22, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: b1a3a4a4-b483-4e60-a072-9ceba2010b73

📥 Commits

Reviewing files that changed from the base of the PR and between 8eb596f and 43e8543.

📒 Files selected for processing (1)
  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📜 Recent review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (28)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
  • GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
  • GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
  • GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
  • GitHub Check: sdk-compat / Node.js 20.20 (ubuntu-latest)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: sdk-compat / Deno Runtime
  • GitHub Check: sdk-compat / Bun Runtime
  • GitHub Check: sdk-compat / Cloudflare Workers
  • GitHub Check: sdk-compat / Node.js 22.12 (ubuntu-latest)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: Analyze (javascript-typescript)
🧰 Additional context used
📓 Path-based instructions (8)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
{packages/core,apps/webapp}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use zod for validation in packages/core and apps/webapp

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Use function declarations instead of default exports

Add crumbs as you write code using // @Crumbs comments or `// `#region` `@crumbs blocks. These are temporary debug instrumentation and must be stripped using agentcrumbs strip before merge.

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
**/*.ts

📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)

**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}

📄 CodeRabbit inference engine (AGENTS.md)

Format code using Prettier before committing

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
**/*.ts{,x}

📄 CodeRabbit inference engine (CLAUDE.md)

Always import from @trigger.dev/sdk when writing Trigger.dev tasks. Never use @trigger.dev/sdk/v3 or deprecated client.defineJob.

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
apps/webapp/**/*.{ts,tsx}

📄 CodeRabbit inference engine (.cursor/rules/webapp.mdc)

apps/webapp/**/*.{ts,tsx}: Access environment variables through the env export of env.server.ts instead of directly accessing process.env
Use subpath exports from @trigger.dev/core package instead of importing from the root @trigger.dev/core path

Use named constants for sentinel/placeholder values (e.g. const UNSET_VALUE = '__unset__') instead of raw string literals scattered across comparisons

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
apps/webapp/**/*.server.ts

📄 CodeRabbit inference engine (apps/webapp/CLAUDE.md)

apps/webapp/**/*.server.ts: Never use request.signal for detecting client disconnects. Use getRequestAbortSignal() from app/services/httpAsyncStorage.server.ts instead, which is wired directly to Express res.on('close') and fires reliably
Access environment variables via env export from app/env.server.ts. Never use process.env directly
Always use findFirst instead of findUnique in Prisma queries. findUnique has an implicit DataLoader that batches concurrent calls and has active bugs even in Prisma 6.x (uppercase UUIDs returning null, composite key SQL correctness issues, 5-10x worse performance). findFirst is never batched and avoids this entire class of issues

Files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
🧠 Learnings (14)
📓 Common learnings
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3368
File: internal-packages/database/prisma/schema.prisma:666-666
Timestamp: 2026-04-16T14:21:14.907Z
Learning: In `triggerdotdev/trigger.dev`, the `BackgroundWorkerTask` covering index on `(runtimeEnvironmentId, slug, triggerSource)` lives in `internal-packages/database/prisma/migrations/20260413000000_add_bwt_covering_index/migration.sql` as a `CREATE INDEX CONCURRENTLY IF NOT EXISTS`, intentionally in its own migration file separate from the `TaskIdentifier` table migration. Do not flag this index as missing from the schema migrations in future reviews.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3333
File: apps/webapp/app/services/runsReplicationService.server.ts:655-685
Timestamp: 2026-04-17T13:20:11.004Z
Learning: In `apps/webapp/app/services/runsReplicationService.server.ts`, the per-ClickHouse-group inserts inside `#flushBatch` are intentionally serialized (sequential) by default. Parallelizing group flushes causes Linux socket write-buffer pressure that required kernel tuning (`net.ipv4.tcp_wmem` set to `4096 20480 8388608`). Parallel flushing may be exposed as an opt-in via a `groupFlushStrategy` env var/option, but sequential must remain the safe default. Do not flag the sequential loop as a performance issue without acknowledging this constraint.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3417
File: apps/webapp/app/services/sessionsReplicationService.server.ts:224-231
Timestamp: 2026-04-20T14:50:16.440Z
Learning: In `apps/webapp/app/services/sessionsReplicationService.server.ts`, the acknowledge-before-flush pattern is intentional and mirrors `runsReplicationService.server.ts`. `_latestCommitEndLsn` is updated at Postgres commit time and acknowledged on a periodic interval via `#acknowledgeLatestTransaction`, without waiting for ClickHouse batch flush to complete. Do not flag this as a durability/ordering issue — this at-least-once delivery trade-off is an established project-wide convention for both runs and sessions replication services.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: packages/redis-worker/src/fair-queue/index.ts:1114-1121
Timestamp: 2026-03-03T13:08:03.862Z
Learning: In packages/redis-worker/src/fair-queue/index.ts, it's acceptable for the worker queue depth cap check to allow overshooting by up to batchClaimSize messages per iteration, as the next iteration will recheck and prevent sustained growth beyond the limit.
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3368
File: apps/webapp/app/services/taskIdentifierRegistry.server.ts:24-67
Timestamp: 2026-04-13T21:44:00.032Z
Learning: In `apps/webapp/app/services/taskIdentifierRegistry.server.ts`, the sequential upsert/updateMany/findMany writes in `syncTaskIdentifiers` are intentionally NOT wrapped in a Prisma transaction. This function runs only during deployment-change events (low-concurrency path), and any partial `isInLatestDeployment` state is acceptable because it self-corrects on the next deployment. Do not flag this as a missing-transaction/atomicity issue in future reviews.
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.
📚 Learning: 2026-04-13T21:44:00.032Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3368
File: apps/webapp/app/services/taskIdentifierRegistry.server.ts:24-67
Timestamp: 2026-04-13T21:44:00.032Z
Learning: In `apps/webapp/app/services/taskIdentifierRegistry.server.ts`, the sequential upsert/updateMany/findMany writes in `syncTaskIdentifiers` are intentionally NOT wrapped in a Prisma transaction. This function runs only during deployment-change events (low-concurrency path), and any partial `isInLatestDeployment` state is acceptable because it self-corrects on the next deployment. Do not flag this as a missing-transaction/atomicity issue in future reviews.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-17T13:20:11.004Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3333
File: apps/webapp/app/services/runsReplicationService.server.ts:655-685
Timestamp: 2026-04-17T13:20:11.004Z
Learning: In `apps/webapp/app/services/runsReplicationService.server.ts`, the per-ClickHouse-group inserts inside `#flushBatch` are intentionally serialized (sequential) by default. Parallelizing group flushes causes Linux socket write-buffer pressure that required kernel tuning (`net.ipv4.tcp_wmem` set to `4096 20480 8388608`). Parallel flushing may be exposed as an opt-in via a `groupFlushStrategy` env var/option, but sequential must remain the safe default. Do not flag the sequential loop as a performance issue without acknowledging this constraint.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-16T14:19:16.309Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: apps/webapp/CLAUDE.md:0-0
Timestamp: 2026-04-16T14:19:16.309Z
Learning: Applies to apps/webapp/app/v3/services/{cancelTaskRun,batchTriggerV3}.server.ts : When editing services that branch on `RunEngineVersion` to support both V1 and V2 (e.g., `cancelTaskRun.server.ts`, `batchTriggerV3.server.ts`), only modify V2 code paths

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2025-07-12T18:06:04.133Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2264
File: apps/webapp/app/services/runsRepository.server.ts:172-174
Timestamp: 2025-07-12T18:06:04.133Z
Learning: In apps/webapp/app/services/runsRepository.server.ts, the in-memory status filtering after fetching runs from Prisma is intentionally used as a workaround for ClickHouse data delays. This approach is acceptable because the result set is limited to a maximum of 100 runs due to pagination, making the performance impact negligible.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-16T14:19:16.309Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: apps/webapp/CLAUDE.md:0-0
Timestamp: 2026-04-16T14:19:16.309Z
Learning: Applies to apps/webapp/{app/v3/services/triggerTask.server.ts,app/v3/services/batchTriggerV3.server.ts} : In `triggerTask.server.ts` and `batchTriggerV3.server.ts`, do NOT add database queries. Task defaults (TTL, etc.) are resolved via `backgroundWorkerTask.findFirst()` in the queue concern (`queues.server.ts`). Piggyback on the existing query instead of adding new ones

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-20T14:50:16.440Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3417
File: apps/webapp/app/services/sessionsReplicationService.server.ts:224-231
Timestamp: 2026-04-20T14:50:16.440Z
Learning: In `apps/webapp/app/services/sessionsReplicationService.server.ts`, the acknowledge-before-flush pattern is intentional and mirrors `runsReplicationService.server.ts`. `_latestCommitEndLsn` is updated at Postgres commit time and acknowledged on a periodic interval via `#acknowledgeLatestTransaction`, without waiting for ClickHouse batch flush to complete. Do not flag this as a durability/ordering issue — this at-least-once delivery trade-off is an established project-wide convention for both runs and sessions replication services.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-07T14:12:59.018Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3331
File: apps/webapp/app/runEngine/concerns/batchPayloads.server.ts:112-136
Timestamp: 2026-04-07T14:12:59.018Z
Learning: In `apps/webapp/app/runEngine/concerns/batchPayloads.server.ts`, the `pRetry` call wrapping `uploadPacketToObjectStore` intentionally retries **all** error types (no `shouldRetry` filter / `AbortError` guards). The maintainer explicitly prefers over-retrying to under-retrying because multiple heterogeneous object store backends are supported and it is impractical to enumerate all permanent error signatures. Do not flag this as an issue in future reviews.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-20T15:08:49.959Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3417
File: apps/webapp/app/services/sessionsReplicationService.server.ts:204-215
Timestamp: 2026-04-20T15:08:49.959Z
Learning: In `apps/webapp/app/services/sessionsReplicationService.server.ts` and `apps/webapp/app/services/runsReplicationService.server.ts`, the `getKey` function in `ConcurrentFlushScheduler` uses `${item.event}_${item.session.id}` / `${item.event}_${item.run.id}` respectively. This pattern is intentionally kept identical across both replication services for consistency. Any change to the deduplication key shape (e.g., keying solely by session/run id) must be applied to both services together, never to one service in isolation. Tracking as a cross-service follow-up.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-04-16T14:19:16.309Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: apps/webapp/CLAUDE.md:0-0
Timestamp: 2026-04-16T14:19:16.309Z
Learning: Applies to apps/webapp/app/v3/services/queues.server.ts : If adding a new task-level default, add it to the existing `select` clause in the `backgroundWorkerTask.findFirst()` query in `queues.server.ts` — do NOT add a second query. If the default doesn't need to be known at trigger time, resolve it at dequeue time instead

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-22T19:34:22.737Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3187
File: apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.errors._index/route.tsx:99-103
Timestamp: 2026-03-22T19:34:22.737Z
Learning: In `apps/webapp/app/presenters/v3/ErrorsListPresenter.server.ts`, the `statuses` filter (UNRESOLVED | RESOLVED | IGNORED) is applied in-memory after the ClickHouse query and a batch Postgres lookup via `getErrorGroupStates`. This is intentional: `status` lives in the Postgres `ErrorGroupState` table, not in ClickHouse, so post-query filtering is the correct approach. Do not flag this as a missing predicate or a no-op filter during code review.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-03T13:08:03.862Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3166
File: packages/redis-worker/src/fair-queue/index.ts:1114-1121
Timestamp: 2026-03-03T13:08:03.862Z
Learning: In packages/redis-worker/src/fair-queue/index.ts, it's acceptable for the worker queue depth cap check to allow overshooting by up to batchClaimSize messages per iteration, as the next iteration will recheck and prevent sustained growth beyond the limit.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-22T13:26:12.060Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 3244
File: apps/webapp/app/components/code/TextEditor.tsx:81-86
Timestamp: 2026-03-22T13:26:12.060Z
Learning: In the triggerdotdev/trigger.dev codebase, do not flag `navigator.clipboard.writeText(...)` calls for `missing-await`/`unhandled-promise` issues. These clipboard writes are intentionally invoked without `await` and without `catch` handlers across the project; keep that behavior consistent when reviewing TypeScript/TSX files (e.g., usages like in `apps/webapp/app/components/code/TextEditor.tsx`).

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
📚 Learning: 2026-03-22T19:24:14.403Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 3187
File: apps/webapp/app/v3/services/alerts/deliverErrorGroupAlert.server.ts:200-204
Timestamp: 2026-03-22T19:24:14.403Z
Learning: In the triggerdotdev/trigger.dev codebase, webhook URLs are not expected to contain embedded credentials/secrets (e.g., fields like `ProjectAlertWebhookProperties` should only hold credential-free webhook endpoints). During code review, if you see logging or inclusion of raw webhook URLs in error messages, do not automatically treat it as a credential-leak/secrets-in-logs issue by default—first verify the URL does not contain embedded credentials (for example, no username/password in the URL, no obvious secret/token query params or fragments). If the URL is credential-free per this project’s conventions, allow the logging.

Applied to files:

  • apps/webapp/app/runEngine/services/streamBatchItems.server.ts
🔇 Additional comments (1)
apps/webapp/app/runEngine/services/streamBatchItems.server.ts (1)

214-226: LGTM — the fallback now covers the fast-completion cleanup race.

Using findFirst and treating status === "COMPLETED" as handled matches the described BatchQueue completion path, so the endpoint can return sealed: true even after Redis cleanup removes the enqueued-items key.
As per coding guidelines, apps/webapp/**/*.server.ts: “Always use findFirst instead of findUnique in Prisma queries.”


Walkthrough

The service now handles a mismatch between the enqueued item count and batch.runCount by re-querying Postgres using findFirst to check both the sealed and status fields. If the batch is either sealed or has a status of "COMPLETED", it is treated as already handled. The inline comment describing the fast-completion/cleanup race condition is updated to clarify that Redis cleanup can occur before the count is read, accounting for the COMPLETED sealing path in addition to sealing initiated by concurrent requests.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The pull request title clearly and concisely describes the main fix: handling a fast-completion race condition in batch streaming seal check.
Description check ✅ Passed The pull request description provides comprehensive context including problem statement, sequence of events, root cause, and the specific fix, but is missing several required template sections like Testing, Changelog, and the completion checklist.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch tri-8700-batch-fast-completion-race

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 potential issues.

View 2 additional findings in Devin Review.

Open in Devin Review

});

if (currentBatch?.sealed) {
if (currentBatch?.sealed || currentBatch?.status === "COMPLETED") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Incomplete race condition fix: second seal-race check at line 296 doesn't handle COMPLETED status

The PR adds || currentBatch?.status === "COMPLETED" to the first race check (line 226) to handle batches that complete before the count is read. However, the same fix is not applied to the analogous second race check at streamBatchItems.server.ts:296, which only accepts currentBatch?.sealed && currentBatch.status === "PROCESSING".

This creates a race window: between getEnqueuedCount (line 210) returning the correct count (Redis keys not yet cleaned) and the seal updateMany (line 269), tryCompleteBatch (batchSystem.ts:103-110) can set the batch status to COMPLETED without setting sealed = true. When the seal attempt fails (status is no longer PENDING), the second check at line 296 rejects the COMPLETED state and throws a ServiceValidationError — even though the batch completed successfully.

Race timeline
  1. Streaming endpoint finishes enqueuing all items
  2. getEnqueuedCount returns correct count (Redis keys exist) → passes first check
  3. Meanwhile, BatchQueue's completionCallback calls tryCompleteBatch, which sets status: "COMPLETED" (but not sealed: true) — see batchSystem.ts:103-110
  4. cleanup() hasn't run yet (happens after completionCallback — see batch-queue/index.ts:1088-1091)
  5. Seal updateMany with status: "PENDING" fails → sealResult.count === 0
  6. Re-query finds sealed: false, status: "COMPLETED"
  7. Check sealed && status === "PROCESSING" → false → throws ServiceValidationError
Prompt for agents
The second race condition check at line 296 in streamBatchItems.server.ts needs the same treatment as the first check at line 226. Currently line 296 only handles the case where a concurrent request sealed the batch (sealed=true, status=PROCESSING). It should also handle the case where the batch has already completed via the BatchQueue completion path (status=COMPLETED, regardless of sealed value). The fix should mirror what was done at line 226: add an OR condition for status === COMPLETED and return a success response with sealed: true. Also, per the repo's CLAUDE.md Prisma rules, the findUnique at line 286 should be changed to findFirst.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +215 to +220
// cleaned up the Redis keys before we got here. This happens when all
// runs complete fast enough that cleanup() deletes the enqueuedItemsKey
// before we read it — typically when the last item executes in the
// milliseconds between the loop ending and getBatchEnqueuedCount() being called.
// Check both sealed (sealed by this endpoint on a concurrent request) and
// COMPLETED (sealed by the BatchQueue completion path before we got here).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Race window between completionCallback and cleanup creates a subtle timing dependency

The PR's comments (lines 215-220) describe the race where cleanup() deletes Redis keys before getBatchEnqueuedCount() is called. However, there's actually a second relevant timing state: between completionCallback (which calls tryCompleteBatch, setting status=COMPLETED) and cleanup (which deletes the keys), as seen in batch-queue/index.ts:1082-1091. In this window, getEnqueuedCount returns the CORRECT count but the status is already COMPLETED. This window is what makes the incomplete fix at line 296 (the second race check) exploitable. The PR's comments could be more precise about these two distinct windows.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant