diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index c348aa401f..6ff5be9e0b 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -34,6 +34,7 @@ import { type ExecutionContext, getNextExecutionOrder, type NormalizedBlockOutput, + type StreamingExecution, } from '@/executor/types' import { streamingResponseFormatProcessor } from '@/executor/utils' import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors' @@ -140,7 +141,7 @@ export class BlockExecutor { let normalizedOutput: NormalizedBlockOutput if (isStreamingExecution) { - const streamingExec = output as { stream: ReadableStream; execution: any } + const streamingExec = output as StreamingExecution if (ctx.onStream) { await this.handleStreamingExecution( @@ -602,7 +603,7 @@ export class BlockExecutor { ctx: ExecutionContext, node: DAGNode, block: SerializedBlock, - streamingExec: { stream: ReadableStream; execution: any }, + streamingExec: StreamingExecution, resolvedInputs: Record, selectedOutputs: string[] ): Promise { @@ -613,56 +614,39 @@ export class BlockExecutor { (block.config?.params as Record | undefined)?.responseFormat ?? (block.config as Record | undefined)?.responseFormat - const stream = streamingExec.stream - if (typeof stream.tee !== 'function') { - await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs) - return - } + const sourceReader = streamingExec.stream.getReader() + const decoder = new TextDecoder() + const accumulated: string[] = [] + let drainError: unknown + let sourceFullyDrained = false - const [clientStream, executorStream] = stream.tee() + const clientSource = new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await sourceReader.read() + if (done) { + const tail = decoder.decode() + if (tail) accumulated.push(tail) + sourceFullyDrained = true + controller.close() + return + } + accumulated.push(decoder.decode(value, { stream: true })) + controller.enqueue(value) + } catch (error) { + drainError = error + controller.error(error) + } + }, + async cancel(reason) { + try { + await sourceReader.cancel(reason) + } catch {} + }, + }) const processedClientStream = streamingResponseFormatProcessor.processStream( - clientStream, - blockId, - selectedOutputs, - responseFormat - ) - - const clientStreamingExec = { - ...streamingExec, - stream: processedClientStream, - } - - const executorConsumption = this.consumeExecutorStream( - executorStream, - streamingExec, - blockId, - responseFormat - ) - - const clientConsumption = (async () => { - try { - await ctx.onStream?.(clientStreamingExec) - } catch (error) { - this.execLogger.error('Error in onStream callback', { blockId, error }) - // Cancel the client stream to release the tee'd buffer - await processedClientStream.cancel().catch(() => {}) - } - })() - - await Promise.all([clientConsumption, executorConsumption]) - } - - private async forwardStream( - ctx: ExecutionContext, - blockId: string, - streamingExec: { stream: ReadableStream; execution: any }, - stream: ReadableStream, - responseFormat: any, - selectedOutputs: string[] - ): Promise { - const processedStream = streamingResponseFormatProcessor.processStream( - stream, + clientSource, blockId, selectedOutputs, responseFormat @@ -670,72 +654,75 @@ export class BlockExecutor { try { await ctx.onStream?.({ - ...streamingExec, - stream: processedStream, + stream: processedClientStream, + execution: streamingExec.execution, }) } catch (error) { this.execLogger.error('Error in onStream callback', { blockId, error }) - await processedStream.cancel().catch(() => {}) - } - } - - private async consumeExecutorStream( - stream: ReadableStream, - streamingExec: { execution: any }, - blockId: string, - responseFormat: any - ): Promise { - const reader = stream.getReader() - const decoder = new TextDecoder() - const chunks: string[] = [] - - try { - while (true) { - const { done, value } = await reader.read() - if (done) break - chunks.push(decoder.decode(value, { stream: true })) - } - const tail = decoder.decode() - if (tail) chunks.push(tail) - } catch (error) { - this.execLogger.error('Error reading executor stream for block', { blockId, error }) + await processedClientStream.cancel().catch(() => {}) } finally { try { - await reader.cancel().catch(() => {}) + sourceReader.releaseLock() } catch {} } - const fullContent = chunks.join('') + if (drainError) { + this.execLogger.error('Error reading stream for block', { blockId, error: drainError }) + return + } + + // If the onStream consumer exited before the source drained (e.g. it caught + // an internal error and returned normally), `accumulated` holds a truncated + // response. Persisting that to memory or setting it as the block output + // would corrupt downstream state — skip and log instead. + if (!sourceFullyDrained) { + this.execLogger.warn( + 'Stream consumer exited before source drained; skipping content persistence', + { + blockId, + } + ) + return + } + + const fullContent = accumulated.join('') if (!fullContent) { return } const executionOutput = streamingExec.execution?.output - if (!executionOutput || typeof executionOutput !== 'object') { - return + if (executionOutput && typeof executionOutput === 'object') { + let parsedForFormat = false + if (responseFormat) { + try { + const parsed = JSON.parse(fullContent.trim()) + streamingExec.execution.output = { + ...parsed, + tokens: executionOutput.tokens, + toolCalls: executionOutput.toolCalls, + providerTiming: executionOutput.providerTiming, + cost: executionOutput.cost, + model: executionOutput.model, + } + parsedForFormat = true + } catch (error) { + this.execLogger.warn('Failed to parse streamed content for response format', { + blockId, + error, + }) + } + } + if (!parsedForFormat) { + executionOutput.content = fullContent + } } - if (responseFormat) { + if (streamingExec.onFullContent) { try { - const parsed = JSON.parse(fullContent.trim()) - - streamingExec.execution.output = { - ...parsed, - tokens: executionOutput.tokens, - toolCalls: executionOutput.toolCalls, - providerTiming: executionOutput.providerTiming, - cost: executionOutput.cost, - model: executionOutput.model, - } - return + await streamingExec.onFullContent(fullContent) } catch (error) { - this.execLogger.warn('Failed to parse streamed content for response format', { - blockId, - error, - }) + this.execLogger.error('onFullContent callback failed', { blockId, error }) } } - - executionOutput.content = fullContent } } diff --git a/apps/sim/executor/handlers/agent/agent-handler.ts b/apps/sim/executor/handlers/agent/agent-handler.ts index fcf43b6844..a77807b843 100644 --- a/apps/sim/executor/handlers/agent/agent-handler.ts +++ b/apps/sim/executor/handlers/agent/agent-handler.ts @@ -958,8 +958,16 @@ export class AgentBlockHandler implements BlockHandler { streamingExec: StreamingExecution ): StreamingExecution { return { - stream: memoryService.wrapStreamForPersistence(streamingExec.stream, ctx, inputs), + stream: streamingExec.stream, execution: streamingExec.execution, + onFullContent: async (content: string) => { + if (!content.trim()) return + try { + await memoryService.appendToMemory(ctx, inputs, { role: 'assistant', content }) + } catch (error) { + logger.error('Failed to persist streaming response:', error) + } + }, } } diff --git a/apps/sim/executor/handlers/agent/memory.ts b/apps/sim/executor/handlers/agent/memory.ts index 6428f0b760..fcba362822 100644 --- a/apps/sim/executor/handlers/agent/memory.ts +++ b/apps/sim/executor/handlers/agent/memory.ts @@ -111,35 +111,6 @@ export class Memory { }) } - wrapStreamForPersistence( - stream: ReadableStream, - ctx: ExecutionContext, - inputs: AgentInputs - ): ReadableStream { - const chunks: string[] = [] - const decoder = new TextDecoder() - - const transformStream = new TransformStream({ - transform: (chunk, controller) => { - controller.enqueue(chunk) - const decoded = decoder.decode(chunk, { stream: true }) - chunks.push(decoded) - }, - - flush: () => { - const content = chunks.join('') - if (content.trim()) { - this.appendToMemory(ctx, inputs, { - role: 'assistant', - content, - }).catch((error) => logger.error('Failed to persist streaming response:', error)) - } - }, - }) - - return stream.pipeThrough(transformStream) - } - private requireWorkspaceId(ctx: ExecutionContext): string { if (!ctx.workspaceId) { throw new Error('workspaceId is required for memory operations') diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index 00caff1d9e..b8b0d4cfa0 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -359,6 +359,12 @@ export interface ExecutionResult { export interface StreamingExecution { stream: ReadableStream execution: ExecutionResult & { isStreaming?: boolean } + /** + * Invoked with the assembled response text after the stream drains. Lets agent + * blocks persist the full response without interposing a TransformStream on a + * fetch-backed source — that pattern amplifies memory on Bun via #28035. + */ + onFullContent?: (content: string) => void | Promise } export interface BlockExecutor {