MCPcopy Index your code
hub / github.com/TanStack/ai / pipeToRunLog

Function pipeToRunLog

packages/ai-sandbox/src/run.ts:66–100  ·  view source on GitHub ↗
(
  stream: AsyncIterable<StreamChunk>,
  opts: PipeToRunLogOptions,
)

Source from the content-addressed store, hash-verified

64 * - `signal` aborts mid-stream → stop consuming, `finish('aborted')`
65 */
66export async function pipeToRunLog(
67 stream: AsyncIterable<StreamChunk>,
68 opts: PipeToRunLogOptions,
69): Promise<RunRecord> {
70 const { log, runId, threadId, signal } = opts
71 await log.open(threadId !== undefined ? { runId, threadId } : { runId })
72 if (signal?.aborted) {
73 await log.finish(runId, 'aborted')
74 return reread(log, runId)
75 }
76
77 try {
78 for await (const chunk of stream) {
79 if (signal?.aborted) {
80 await log.finish(runId, 'aborted')
81 return reread(log, runId)
82 }
83 await log.append(runId, chunk)
84 if (isRunErrorChunk(chunk)) {
85 await log.finish(runId, 'error', runErrorFromChunk(chunk))
86 return reread(log, runId)
87 }
88 }
89 } catch (error) {
90 // Detached run: no caller to throw to. Record the failure in the log so
91 // tailing clients observe it, then return — do NOT rethrow.
92 const message = messageOf(error)
93 await log.append(runId, syntheticRunError(message))
94 await log.finish(runId, 'error', { message })
95 return reread(log, runId)
96 }
97
98 await log.finish(runId, 'done')
99 return reread(log, runId)
100}
101
102/** Re-read the now-terminal record; the run was just driven, so it must exist. */
103async function reread(log: RunEventLog, runId: string): Promise<RunRecord> {

Callers 2

run.test.tsFile · 0.90
startMethod · 0.85

Calls 8

rereadFunction · 0.85
runErrorFromChunkFunction · 0.85
messageOfFunction · 0.85
syntheticRunErrorFunction · 0.85
isRunErrorChunkFunction · 0.70
appendMethod · 0.65
openMethod · 0.45
finishMethod · 0.45

Tested by

no test coverage detected