MCPcopy
hub / github.com/coder/mux / setupStreamEventForwarding

Method setupStreamEventForwarding

src/node/services/aiService.ts:626–715  ·  view source on GitHub ↗

* Forward all stream events from StreamManager to AIService consumers

()

Source from the content-addressed store, hash-verified

624 * Forward all stream events from StreamManager to AIService consumers
625 */
626 private setupStreamEventForwarding(): void {
627 // Simple one-to-one event forwarding from StreamManager → AIService consumers
628 for (const event of [
629 "stream-start",
630 "stream-delta",
631 "tool-call-start",
632 "tool-call-delta",
633 "tool-call-end",
634 "reasoning-delta",
635 "reasoning-end",
636 "workflow-run-attached",
637 "usage-delta",
638 ] as const) {
639 this.streamManager.on(event, (data) => this.emit(event, data));
640 }
641
642 // Stream errors can bypass stream-end/stream-abort. Clear any queued metadata
643 // so failed requests don't leak pending-run tracking entries.
644 this.streamManager.on("error", (data: ErrorEvent) => {
645 this.clearTrackedPendingDevToolsRunMetadata(data.messageId);
646 this.emit("error", data);
647 });
648
649 // stream-end needs extra logic: capture provider response for debug modal
650 this.streamManager.on("stream-end", (data: StreamEndEvent) => {
651 // Streams can end before DevTools middleware creates a run (for example when
652 // interrupted early). Clear any still-queued run metadata for this message.
653 this.clearTrackedPendingDevToolsRunMetadata(data.messageId);
654
655 // Best-effort capture of the provider response for the "Last LLM request" debug modal.
656 // Must never break live streaming.
657 try {
658 const snapshot = this.lastLlmRequestByWorkspace.get(data.workspaceId);
659 if (snapshot) {
660 // If messageId is missing (legacy fixtures), attach anyway.
661 const shouldAttach = snapshot.messageId === data.messageId || snapshot.messageId == null;
662 if (shouldAttach) {
663 const updated: DebugLlmRequestSnapshot = {
664 ...snapshot,
665 response: {
666 capturedAt: Date.now(),
667 metadata: data.metadata,
668 parts: data.parts,
669 },
670 };
671
672 this.lastLlmRequestByWorkspace.set(data.workspaceId, structuredClone(updated));
673 }
674 }
675 } catch (error) {
676 const errMsg = getErrorMessage(error);
677 log.warn("Failed to capture debug LLM response snapshot", { error: errMsg });
678 }
679
680 this.emit("stream-end", data);
681 });
682
683 // Handle stream-abort: dispose of partial based on abandonPartial flag

Callers 1

constructorMethod · 0.95

Calls 9

getErrorMessageFunction · 0.90
onMethod · 0.80
setMethod · 0.80
deletePartialMethod · 0.80
readPartialMethod · 0.80
commitPartialMethod · 0.80
emitMethod · 0.65
getMethod · 0.65

Tested by

no test coverage detected