diff --git a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts index 859dfe2e6b..79c84eb540 100644 --- a/apps/webapp/app/runEngine/services/streamBatchItems.server.ts +++ b/apps/webapp/app/runEngine/services/streamBatchItems.server.ts @@ -212,15 +212,18 @@ export class StreamBatchItemsService extends WithRunEngine { // Validate we received the expected number of items if (enqueuedCount !== batch.runCount) { // The batch queue consumers may have already processed all items and - // cleaned up the Redis keys before we got here (especially likely when - // items include pre-failed runs that complete instantly). Check if the - // batch was already sealed/completed in Postgres. - const currentBatch = await this._prisma.batchTaskRun.findUnique({ + // cleaned up the Redis keys before we got here. This happens when all + // runs complete fast enough that cleanup() deletes the enqueuedItemsKey + // before we read it — typically when the last item executes in the + // milliseconds between the loop ending and getBatchEnqueuedCount() being called. + // Check both sealed (sealed by this endpoint on a concurrent request) and + // COMPLETED (sealed by the BatchQueue completion path before we got here). + const currentBatch = await this._prisma.batchTaskRun.findFirst({ where: { id: batchId }, select: { sealed: true, status: true }, }); - if (currentBatch?.sealed) { + if (currentBatch?.sealed || currentBatch?.status === "COMPLETED") { logger.info("Batch already sealed before count check (fast completion)", { batchId: batchFriendlyId, itemsAccepted, @@ -279,8 +282,18 @@ export class StreamBatchItemsService extends WithRunEngine { // Check if we won the race to seal the batch if (sealResult.count === 0) { - // Another request sealed the batch first - re-query to check current state - const currentBatch = await this._prisma.batchTaskRun.findUnique({ + // The conditional update failed because the batch was no longer in + // PENDING status. Re-query to determine which path got there first: + // - A concurrent streaming request already sealed and moved it to + // PROCESSING. + // - The BatchQueue completion path finished all runs and set it to + // COMPLETED (without setting sealed=true — that's this endpoint's + // job). This window exists between completionCallback (which calls + // tryCompleteBatch) and cleanup() in BatchQueue — see + // batch-queue/index.ts. + // Either way the goal — a durable batch that the SDK stops retrying — + // has been achieved, so we return sealed: true. + const currentBatch = await this._prisma.batchTaskRun.findFirst({ where: { id: batchId }, select: { id: true, @@ -290,13 +303,17 @@ export class StreamBatchItemsService extends WithRunEngine { }, }); - if (currentBatch?.sealed && currentBatch.status === "PROCESSING") { - // The batch was sealed by another request - this is fine, the goal was achieved - logger.info("Batch already sealed by concurrent request", { + if ( + (currentBatch?.sealed && currentBatch.status === "PROCESSING") || + currentBatch?.status === "COMPLETED" + ) { + logger.info("Batch already sealed/completed by concurrent path", { batchId: batchFriendlyId, itemsAccepted, itemsDeduplicated, envId: environment.id, + batchStatus: currentBatch.status, + batchSealed: currentBatch.sealed, }); span.setAttribute("itemsAccepted", itemsAccepted); diff --git a/apps/webapp/test/engine/streamBatchItems.test.ts b/apps/webapp/test/engine/streamBatchItems.test.ts index 2dee866876..6e5f2264b1 100644 --- a/apps/webapp/test/engine/streamBatchItems.test.ts +++ b/apps/webapp/test/engine/streamBatchItems.test.ts @@ -384,6 +384,130 @@ describe("StreamBatchItemsService", () => { } ); + containerTest( + "should return sealed=true when batch is COMPLETED by BatchQueue before seal attempt", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + disabled: true, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + batchQueue: { + redis: redisOptions, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // Create a batch in PENDING state + const batch = await createBatch(prisma, authenticatedEnvironment.id, { + runCount: 2, + status: "PENDING", + sealed: false, + }); + + // Initialize the batch in Redis + await engine.initializeBatch({ + batchId: batch.id, + friendlyId: batch.friendlyId, + environmentId: authenticatedEnvironment.id, + environmentType: authenticatedEnvironment.type, + organizationId: authenticatedEnvironment.organizationId, + projectId: authenticatedEnvironment.projectId, + runCount: 2, + processingConcurrency: 10, + }); + + // Enqueue items - the enqueued count check passes but the seal updateMany + // will race with tryCompleteBatch moving status to COMPLETED. + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, { + task: "test-task", + payload: JSON.stringify({ data: "item1" }), + payloadType: "application/json", + }); + await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, { + task: "test-task", + payload: JSON.stringify({ data: "item2" }), + payloadType: "application/json", + }); + + // Simulate the race where BatchQueue's completionCallback runs + // tryCompleteBatch between getEnqueuedCount and the seal updateMany. + // tryCompleteBatch sets status=COMPLETED but NOT sealed=true. + const racingPrisma = { + ...prisma, + batchTaskRun: { + ...prisma.batchTaskRun, + findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun), + updateMany: async () => { + await prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + status: "COMPLETED", + }, + }); + // The conditional updateMany(where: status="PENDING") would now fail + return { count: 0 }; + }, + findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun), + }, + } as unknown as PrismaClient; + + const service = new StreamBatchItemsService({ + prisma: racingPrisma, + engine, + }); + + const result = await service.call( + authenticatedEnvironment, + batch.friendlyId, + itemsToAsyncIterable([]), + { + maxItemBytes: 1024 * 1024, + } + ); + + // The endpoint should accept the COMPLETED state as a success case so the + // SDK does not retry a batch whose child runs have already finished. + expect(result.sealed).toBe(true); + expect(result.id).toBe(batch.friendlyId); + + const updatedBatch = await prisma.batchTaskRun.findUnique({ + where: { id: batch.id }, + }); + + expect(updatedBatch?.status).toBe("COMPLETED"); + // sealed stays false because the BatchQueue completion path does not set + // it - that's fine, the batch is terminal. + expect(updatedBatch?.sealed).toBe(false); + + await engine.quit(); + } + ); + containerTest( "should throw error when race condition leaves batch in unexpected state", async ({ prisma, redisOptions }) => {