Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions apps/webapp/app/runEngine/services/streamBatchItems.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,18 @@ export class StreamBatchItemsService extends WithRunEngine {
// Validate we received the expected number of items
if (enqueuedCount !== batch.runCount) {
// The batch queue consumers may have already processed all items and
// cleaned up the Redis keys before we got here (especially likely when
// items include pre-failed runs that complete instantly). Check if the
// batch was already sealed/completed in Postgres.
const currentBatch = await this._prisma.batchTaskRun.findUnique({
// cleaned up the Redis keys before we got here. This happens when all
// runs complete fast enough that cleanup() deletes the enqueuedItemsKey
// before we read it — typically when the last item executes in the
// milliseconds between the loop ending and getBatchEnqueuedCount() being called.
// Check both sealed (sealed by this endpoint on a concurrent request) and
// COMPLETED (sealed by the BatchQueue completion path before we got here).
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
const currentBatch = await this._prisma.batchTaskRun.findFirst({
where: { id: batchId },
select: { sealed: true, status: true },
});

if (currentBatch?.sealed) {
if (currentBatch?.sealed || currentBatch?.status === "COMPLETED") {
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
logger.info("Batch already sealed before count check (fast completion)", {
batchId: batchFriendlyId,
itemsAccepted,
Expand Down Expand Up @@ -279,8 +282,18 @@ export class StreamBatchItemsService extends WithRunEngine {

// Check if we won the race to seal the batch
if (sealResult.count === 0) {
// Another request sealed the batch first - re-query to check current state
const currentBatch = await this._prisma.batchTaskRun.findUnique({
// The conditional update failed because the batch was no longer in
// PENDING status. Re-query to determine which path got there first:
// - A concurrent streaming request already sealed and moved it to
// PROCESSING.
// - The BatchQueue completion path finished all runs and set it to
// COMPLETED (without setting sealed=true — that's this endpoint's
// job). This window exists between completionCallback (which calls
// tryCompleteBatch) and cleanup() in BatchQueue — see
// batch-queue/index.ts.
// Either way the goal — a durable batch that the SDK stops retrying —
// has been achieved, so we return sealed: true.
const currentBatch = await this._prisma.batchTaskRun.findFirst({
where: { id: batchId },
select: {
id: true,
Expand All @@ -290,13 +303,17 @@ export class StreamBatchItemsService extends WithRunEngine {
},
});

if (currentBatch?.sealed && currentBatch.status === "PROCESSING") {
// The batch was sealed by another request - this is fine, the goal was achieved
logger.info("Batch already sealed by concurrent request", {
if (
(currentBatch?.sealed && currentBatch.status === "PROCESSING") ||
currentBatch?.status === "COMPLETED"
) {
logger.info("Batch already sealed/completed by concurrent path", {
batchId: batchFriendlyId,
itemsAccepted,
itemsDeduplicated,
envId: environment.id,
batchStatus: currentBatch.status,
batchSealed: currentBatch.sealed,
});

span.setAttribute("itemsAccepted", itemsAccepted);
Expand Down
124 changes: 124 additions & 0 deletions apps/webapp/test/engine/streamBatchItems.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,130 @@ describe("StreamBatchItemsService", () => {
}
);

containerTest(
"should return sealed=true when batch is COMPLETED by BatchQueue before seal attempt",
async ({ prisma, redisOptions }) => {
const engine = new RunEngine({
prisma,
worker: {
redis: redisOptions,
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
disabled: true,
},
queue: {
redis: redisOptions,
},
runLock: {
redis: redisOptions,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0005,
},
batchQueue: {
redis: redisOptions,
},
tracer: trace.getTracer("test", "0.0.0"),
});

const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");

// Create a batch in PENDING state
const batch = await createBatch(prisma, authenticatedEnvironment.id, {
runCount: 2,
status: "PENDING",
sealed: false,
});

// Initialize the batch in Redis
await engine.initializeBatch({
batchId: batch.id,
friendlyId: batch.friendlyId,
environmentId: authenticatedEnvironment.id,
environmentType: authenticatedEnvironment.type,
organizationId: authenticatedEnvironment.organizationId,
projectId: authenticatedEnvironment.projectId,
runCount: 2,
processingConcurrency: 10,
});

// Enqueue items - the enqueued count check passes but the seal updateMany
// will race with tryCompleteBatch moving status to COMPLETED.
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, {
task: "test-task",
payload: JSON.stringify({ data: "item1" }),
payloadType: "application/json",
});
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, {
task: "test-task",
payload: JSON.stringify({ data: "item2" }),
payloadType: "application/json",
});

// Simulate the race where BatchQueue's completionCallback runs
// tryCompleteBatch between getEnqueuedCount and the seal updateMany.
// tryCompleteBatch sets status=COMPLETED but NOT sealed=true.
const racingPrisma = {
...prisma,
batchTaskRun: {
...prisma.batchTaskRun,
findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun),
updateMany: async () => {
await prisma.batchTaskRun.update({
where: { id: batch.id },
data: {
status: "COMPLETED",
},
});
// The conditional updateMany(where: status="PENDING") would now fail
return { count: 0 };
},
findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun),
},
} as unknown as PrismaClient;

const service = new StreamBatchItemsService({
prisma: racingPrisma,
engine,
});

const result = await service.call(
authenticatedEnvironment,
batch.friendlyId,
itemsToAsyncIterable([]),
{
maxItemBytes: 1024 * 1024,
}
);

// The endpoint should accept the COMPLETED state as a success case so the
// SDK does not retry a batch whose child runs have already finished.
expect(result.sealed).toBe(true);
expect(result.id).toBe(batch.friendlyId);

const updatedBatch = await prisma.batchTaskRun.findUnique({
where: { id: batch.id },
});

expect(updatedBatch?.status).toBe("COMPLETED");
// sealed stays false because the BatchQueue completion path does not set
// it - that's fine, the batch is terminal.
expect(updatedBatch?.sealed).toBe(false);

await engine.quit();
}
);

containerTest(
"should throw error when race condition leaves batch in unexpected state",
async ({ prisma, redisOptions }) => {
Expand Down
Loading