From 43e8543dad57e6252fac4fad81b406bdcf5b0b4b Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Wed, 22 Apr 2026 23:41:50 +0100 Subject: [PATCH 1/2] fix: handle fast-completion race in StreamBatchItemsService count check When all batch runs complete before getBatchEnqueuedCount() is called, cleanup() has already deleted the enqueuedItemsKey in Redis, causing it to return 0. The existing Postgres fallback only checked sealed, but the BatchQueue completion path sets status=COMPLETED without setting sealed=true. Add the status check so the endpoint returns sealed:true instead of triggering SDK retries into a dead BatchQueue. Also switch findUnique to findFirst per webapp convention. --- .../runEngine/services/streamBatchItems.server.ts | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts index 859dfe2e6b9..9929e21d239 100644 --- a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts +++ b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts @@ -212,15 +212,18 @@ export class StreamBatchItemsService extends WithRunEngine { // Validate we received the expected number of items if (enqueuedCount !== batch.runCount) { // The batch queue consumers may have already processed all items and - // cleaned up the Redis keys before we got here (especially likely when - // items include pre-failed runs that complete instantly). Check if the - // batch was already sealed/completed in Postgres. - const currentBatch = await this._prisma.batchTaskRun.findUnique({ + // cleaned up the Redis keys before we got here. This happens when all + // runs complete fast enough that cleanup() deletes the enqueuedItemsKey + // before we read it — typically when the last item executes in the + // milliseconds between the loop ending and getBatchEnqueuedCount() being called. + // Check both sealed (sealed by this endpoint on a concurrent request) and + // COMPLETED (sealed by the BatchQueue completion path before we got here). + const currentBatch = await this._prisma.batchTaskRun.findFirst({ where: { id: batchId }, select: { sealed: true, status: true }, }); - if (currentBatch?.sealed) { + if (currentBatch?.sealed || currentBatch?.status === "COMPLETED") { logger.info("Batch already sealed before count check (fast completion)", { batchId: batchFriendlyId, itemsAccepted, From ad29045d9d202529f630d5d0ed3929fc5538331e Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 23 Apr 2026 10:28:24 +0000 Subject: [PATCH 2/2] fix: handle COMPLETED status in post-seal race check too The second race check at the seal updateMany only accepted sealed=true && status=PROCESSING. But the BatchQueue completion path can set status=COMPLETED (without sealed=true) between getEnqueuedCount and the seal updateMany, causing the check to reject a legitimate success state and throw ServiceValidationError. Also switch the post-seal re-query from findUnique to findFirst per webapp convention. Co-Authored-By: Matt Aitken --- .../services/streamBatchItems.server.ts | 24 +++- .../test/engine/streamBatchItems.test.ts | 124 ++++++++++++++++++ 2 files changed, 143 insertions(+), 5 deletions(-) diff --git a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts index 9929e21d239..79c84eb540d 100644 --- a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts +++ b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts @@ -282,8 +282,18 @@ export class StreamBatchItemsService extends WithRunEngine { // Check if we won the race to seal the batch if (sealResult.count === 0) { - // Another request sealed the batch first - re-query to check current state - const currentBatch = await this._prisma.batchTaskRun.findUnique({ + // The conditional update failed because the batch was no longer in + // PENDING status. Re-query to determine which path got there first: + // - A concurrent streaming request already sealed and moved it to + // PROCESSING. + // - The BatchQueue completion path finished all runs and set it to + // COMPLETED (without setting sealed=true — that's this endpoint's + // job). This window exists between completionCallback (which calls + // tryCompleteBatch) and cleanup() in BatchQueue — see + // batch-queue/index.ts. + // Either way the goal — a durable batch that the SDK stops retrying — + // has been achieved, so we return sealed: true. + const currentBatch = await this._prisma.batchTaskRun.findFirst({ where: { id: batchId }, select: { id: true, @@ -293,13 +303,17 @@ export class StreamBatchItemsService extends WithRunEngine { }, }); - if (currentBatch?.sealed && currentBatch.status === "PROCESSING") { - // The batch was sealed by another request - this is fine, the goal was achieved - logger.info("Batch already sealed by concurrent request", { + if ( + (currentBatch?.sealed && currentBatch.status === "PROCESSING") || + currentBatch?.status === "COMPLETED" + ) { + logger.info("Batch already sealed/completed by concurrent path", { batchId: batchFriendlyId, itemsAccepted, itemsDeduplicated, envId: environment.id, + batchStatus: currentBatch.status, + batchSealed: currentBatch.sealed, }); span.setAttribute("itemsAccepted", itemsAccepted); diff --git a/apps/webapp/test/engine/streamBatchItems.test.ts b/apps/webapp/test/engine/streamBatchItems.test.ts index 2dee8668762..6e5f2264b1f 100644 --- a/apps/webapp/test/engine/streamBatchItems.test.ts +++ b/apps/webapp/test/engine/streamBatchItems.test.ts @@ -384,6 +384,130 @@ describe("StreamBatchItemsService", () => { } ); + containerTest( + "should return sealed=true when batch is COMPLETED by BatchQueue before seal attempt", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // Create a batch in PENDING state + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "PENDING", + sealed: false, + }); + + // Initialize the batch in Redis + await engine.initializeBatch({ + batchId: batch.id, + friendlyId: batch.friendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 2, + processingConcurrency: 10, + }); + + // Enqueue items - the enqueued count check passes but the seal updateMany + // will race with tryCompleteBatch moving status to COMPLETED. + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { + task: "test-task", + payload: JSON.stringify({ data: "item1" }), + payloadType: "application/json", + }); + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, { + task: "test-task", + payload: JSON.stringify({ data: "item2" }), + payloadType: "application/json", + }); + + // Simulate the race where BatchQueue's completionCallback runs + // tryCompleteBatch between getEnqueuedCount and the seal updateMany. + // tryCompleteBatch sets status=COMPLETED but NOT sealed=true. + const racingPrisma = { + ...prisma, + batchTaskRun: { + ...prisma.batchTaskRun, + findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun), + updateMany: async () => { + await prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + status: "COMPLETED", + }, + }); + // The conditional updateMany(where: status="PENDING") would now fail + return { count: 0 }; + }, + findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), + }, + } as unknown as PrismaClient; + + const service = new StreamBatchItemsService({ + prisma: racingPrisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + // The endpoint should accept the COMPLETED state as a success case so the + // SDK does not retry a batch whose child runs have already finished. + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + + const updatedBatch = await prisma.batchTaskRun.findUnique({ + where: { id: batch.id }, + }); + + expect(updatedBatch?.status).toBe("COMPLETED"); + // sealed stays false because the BatchQueue completion path does not set + // it - that's fine, the batch is terminal. + expect(updatedBatch?.sealed).toBe(false); + + await engine.quit(); + } + ); + containerTest( "should throw error when race condition leaves batch in unexpected state", async ({ prisma, redisOptions }) => {