* Consume chunks from the connection subscription.
(signal: AbortSignal)
| 669 | * Consume chunks from the connection subscription. |
| 670 | */ |
| 671 | private async consumeSubscription(signal: AbortSignal): Promise<void> { |
| 672 | const stream = this.connection.subscribe(signal) |
| 673 | for await (const chunk of stream) { |
| 674 | if (signal.aborted) break |
| 675 | if (this.connectionStatus === 'connecting') { |
| 676 | this.setConnectionStatus('connected') |
| 677 | } |
| 678 | const shouldIgnore = this.persistor?.shouldIgnoreChunk(chunk) ?? false |
| 679 | if (shouldIgnore) { |
| 680 | if (chunk.type === 'RUN_FINISHED' || chunk.type === 'RUN_ERROR') { |
| 681 | if (getChunkRunId(chunk)) { |
| 682 | this.updateRunLifecycle(chunk, { resolveProcessing: false }) |
| 683 | } else { |
| 684 | this.drainIgnoredRunlessChunk(chunk) |
| 685 | } |
| 686 | } |
| 687 | continue |
| 688 | } |
| 689 | this.callbacksRef.current.onChunk(chunk) |
| 690 | this.devtoolsBridge.observeChunk(chunk) |
| 691 | this.processor.processChunk(chunk) |
| 692 | // Run lifecycle (active-run tracking, session-generating state, and |
| 693 | // processing resolution for RUN_FINISHED / RUN_ERROR) is handled in a |
| 694 | // single place so the ignored-chunk path above and this path can't |
| 695 | // diverge. RUN_ERROR carries its runId via the AG-UI passthrough so a |
| 696 | // per-run error only clears that run, while a runId-less RUN_ERROR is |
| 697 | // treated as a session-level error that clears every active run. |
| 698 | this.updateRunLifecycle(chunk) |
| 699 | // Yield control back to event loop for UI updates |
| 700 | await new Promise((resolve) => setTimeout(resolve, 0)) |
| 701 | } |
| 702 | } |
| 703 | |
| 704 | /** |
| 705 | * Ensure subscription loop is running, starting it if needed. |
no test coverage detected