MCPcopy Index your code
hub / github.com/TanStack/ai / tailRun

Function tailRun

examples/sandbox-cloudflare/src/routes/api.run.ts:178–266  ·  view source on GitHub ↗

* Open the run's WebSocket tail (the coordinator's `fetch` returns a `101` with a * `webSocket`) and yield each chat `StreamChunk` as it arrives. Resolves when the * coordinator sends its terminal `status` frame (or the socket closes / the client * disconnects).

(
  coordinator: Coordinator,
  runId: string,
  threadId: string,
  signal: AbortSignal,
)

Source from the content-addressed store, hash-verified

176 * disconnects).
177 */
178async function* tailRun(
179 coordinator: Coordinator,
180 runId: string,
181 threadId: string,
182 signal: AbortSignal,
183): AsyncGenerator<StreamChunk> {
184 // The host is irrelevant — the DO routes on the pathname; this is an in-process
185 // DO `fetch`, not a public request.
186 const streamUrl = `https://do/runs/${runId}/stream?threadId=${encodeURIComponent(threadId)}&lastSeq=-1`
187 const res = await coordinator.fetch(streamUrl, {
188 headers: { Upgrade: 'websocket' },
189 })
190 const socket = res.webSocket
191 if (!socket) {
192 throw new Error(
193 `agent stream did not upgrade to a WebSocket (status ${res.status})`,
194 )
195 }
196 socket.accept()
197
198 const queue: Array<StreamChunk> = []
199 // Mutated from the socket/abort callbacks below. Held on an object (rather than
200 // bare `let`s) so the generator loop's checks aren't flagged as constant.
201 const state: { finished: boolean; failure: Error | null } = {
202 finished: false,
203 failure: null,
204 }
205 let wake: (() => void) | null = null
206 const signalReady = () => {
207 wake?.()
208 wake = null
209 }
210
211 socket.addEventListener('message', (event: MessageEvent) => {
212 let parsed: unknown
213 try {
214 parsed = JSON.parse(typeof event.data === 'string' ? event.data : '')
215 } catch {
216 return
217 }
218 if (parsed === null || typeof parsed !== 'object') return
219 if ('type' in parsed && parsed.type === 'status') {
220 state.finished = true
221 } else if ('chunk' in parsed) {
222 queue.push(parsed.chunk as StreamChunk)
223 }
224 signalReady()
225 })
226 socket.addEventListener('close', () => {
227 state.finished = true
228 signalReady()
229 })
230 socket.addEventListener('error', () => {
231 state.failure = new Error('agent stream socket error')
232 state.finished = true
233 signalReady()
234 })
235 const onAbort = () => {

Callers 1

api.run.tsFile · 0.85

Calls 7

signalReadyFunction · 0.85
acceptMethod · 0.80
parseMethod · 0.80
fetchMethod · 0.65
closeMethod · 0.65
addEventListenerMethod · 0.45
pushMethod · 0.45

Tested by

no test coverage detected