MCPcopy Index your code
hub / github.com/TanStack/ai / consumeSubscription

Method consumeSubscription

packages/ai-client/src/chat-client.ts:671–702  ·  view source on GitHub ↗

* Consume chunks from the connection subscription.

(signal: AbortSignal)

Source from the content-addressed store, hash-verified

669 * Consume chunks from the connection subscription.
670 */
671 private async consumeSubscription(signal: AbortSignal): Promise<void> {
672 const stream = this.connection.subscribe(signal)
673 for await (const chunk of stream) {
674 if (signal.aborted) break
675 if (this.connectionStatus === 'connecting') {
676 this.setConnectionStatus('connected')
677 }
678 const shouldIgnore = this.persistor?.shouldIgnoreChunk(chunk) ?? false
679 if (shouldIgnore) {
680 if (chunk.type === 'RUN_FINISHED' || chunk.type === 'RUN_ERROR') {
681 if (getChunkRunId(chunk)) {
682 this.updateRunLifecycle(chunk, { resolveProcessing: false })
683 } else {
684 this.drainIgnoredRunlessChunk(chunk)
685 }
686 }
687 continue
688 }
689 this.callbacksRef.current.onChunk(chunk)
690 this.devtoolsBridge.observeChunk(chunk)
691 this.processor.processChunk(chunk)
692 // Run lifecycle (active-run tracking, session-generating state, and
693 // processing resolution for RUN_FINISHED / RUN_ERROR) is handled in a
694 // single place so the ignored-chunk path above and this path can't
695 // diverge. RUN_ERROR carries its runId via the AG-UI passthrough so a
696 // per-run error only clears that run, while a runId-less RUN_ERROR is
697 // treated as a session-level error that clears every active run.
698 this.updateRunLifecycle(chunk)
699 // Yield control back to event loop for UI updates
700 await new Promise((resolve) => setTimeout(resolve, 0))
701 }
702 }
703
704 /**
705 * Ensure subscription loop is running, starting it if needed.

Callers 1

startSubscriptionMethod · 0.95

Calls 8

setConnectionStatusMethod · 0.95
updateRunLifecycleMethod · 0.95
getChunkRunIdFunction · 0.90
shouldIgnoreChunkMethod · 0.80
processChunkMethod · 0.80
subscribeMethod · 0.65
observeChunkMethod · 0.45

Tested by

no test coverage detected