MCPcopy Index your code
hub / github.com/TanStack/ai / pump

Function pump

packages/ai-sandbox-cloudflare/src/coordinator.ts:242–275  ·  view source on GitHub ↗

* Replay-then-tail loop for one socket. Each delivered event advances the * socket's persisted cursor so a mid-stream reconnect resumes exactly once. * No-ops if a pump is already running for this socket (see pumping).

(socket: WebSocket, runId: string, fromSeq: number)

Source from the content-addressed store, hash-verified

240 * No-ops if a pump is already running for this socket (see {@link pumping}).
241 */
242 private pump(socket: WebSocket, runId: string, fromSeq: number): void {
243 if (this.pumping.has(socket)) return
244 this.pumping.add(socket)
245 const done = (async () => {
246 try {
247 for await (const event of this.controller.attach(runId, { fromSeq })) {
248 socket.send(JSON.stringify(event))
249 socket.serializeAttachment({
250 runId,
251 lastSeq: event.seq,
252 } satisfies SocketAttachment)
253 }
254 const record = await this.log.get(runId)
255 if (socket.readyState === WebSocket.OPEN) {
256 socket.send(JSON.stringify({ type: 'status', record }))
257 socket.close(1000, 'run complete')
258 }
259 } catch (error) {
260 // A tail loop throwing means a run-log read failed — an operator needs
261 // the full error, but the client only gets a truncated close reason.
262 const message = error instanceof Error ? error.message : String(error)
263 console.error(
264 `[sandbox-coordinator] tail failed for run ${runId}:`,
265 error,
266 )
267 if (socket.readyState === WebSocket.OPEN) {
268 socket.close(1011, message.slice(0, 120))
269 }
270 } finally {
271 this.pumping.delete(socket)
272 }
273 })()
274 this.ctx.waitUntil(done)
275 }
276
277 override webSocketMessage(
278 ws: WebSocket,

Callers

nothing calls this directly

Calls 9

hasMethod · 0.80
attachMethod · 0.80
serializeAttachmentMethod · 0.80
sendMethod · 0.65
getMethod · 0.65
closeMethod · 0.65
errorMethod · 0.65
deleteMethod · 0.65
waitUntilMethod · 0.65

Tested by

no test coverage detected