MCPcopy
hub / github.com/coder/mux / runChatSubscription

Method runChatSubscription

src/node/acp/agent.ts:1529–1706  ·  view source on GitHub ↗
(
    sessionId: string,
    workspaceId: string,
    onConnected: () => void,
    onChatMode: OnChatMode,
    subscriptionToken: symbol
  )

Source from the content-addressed store, hash-verified

1527 }
1528
1529 private async runChatSubscription(
1530 sessionId: string,
1531 workspaceId: string,
1532 onConnected: () => void,
1533 onChatMode: OnChatMode,
1534 subscriptionToken: symbol
1535 ): Promise<void> {
1536 const chatStream = await this.server.client.workspace.onChat({
1537 workspaceId,
1538 mode: onChatMode,
1539 });
1540 onConnected();
1541 this.touchSession(sessionId);
1542
1543 // Decouple turn resolution from sessionUpdate writes.
1544 // handleStreamEvent must fire for every event regardless of stdout
1545 // backpressure. Without this decoupling, stream-end can stall behind
1546 // a blocked sessionUpdate write, preventing resolveTurn from firing
1547 // and causing prompt() to hang indefinitely.
1548 //
1549 // Keep memory bounded without stalling terminal-event observation.
1550 //
1551 // If stdout backpressure saturates the forwarding queue and we block the
1552 // drain loop, stream-end/stream-abort can be delayed behind thousands of
1553 // queued deltas, making prompt() appear hung. Instead, once saturated we
1554 // drop only high-volume chunk/replay events while preserving lifecycle/
1555 // control events (caught-up, tool-call-end, etc.) so turn-completion and
1556 // translator state remain correct.
1557 const { push, iterate, end } = createAsyncMessageQueue<WorkspaceChatMessage>();
1558 let queuedEventCount = 0;
1559 let droppedNonTerminalEventCount = 0;
1560 let hasCaughtUp = onChatMode.type !== "full";
1561
1562 const isTerminalStreamEvent = (event: WorkspaceChatMessage): boolean =>
1563 event.type === "stream-end" ||
1564 event.type === "stream-abort" ||
1565 event.type === "stream-error" ||
1566 event.type === "error";
1567
1568 const isDroppableUnderBackpressure = (event: WorkspaceChatMessage): boolean => {
1569 const isReplayMessageEvent =
1570 event.type === "message" &&
1571 ((event as { replay?: boolean }).replay === true || !hasCaughtUp);
1572
1573 return (
1574 event.type === "stream-delta" ||
1575 event.type === "reasoning-delta" ||
1576 event.type === "tool-call-delta" ||
1577 event.type === "usage-delta" ||
1578 event.type === "session-usage-delta" ||
1579 event.type === "advisor-output" ||
1580 event.type === "advisor-reasoning-output" ||
1581 event.type === "bash-output" ||
1582 event.type === "init-output" ||
1583 // Drop replay history messages under saturation, but keep live message
1584 // events so ACP clients do not miss real-time conversation updates.
1585 isReplayMessageEvent
1586 );

Callers 1

Calls 12

touchSessionMethod · 0.95
handleStreamEventMethod · 0.95
rejectTurnMethod · 0.95
createAsyncMessageQueueFunction · 0.90
setMethod · 0.80
consumeAndForwardMethod · 0.80
returnMethod · 0.80
getMethod · 0.65
pushFunction · 0.50
endFunction · 0.50
deleteMethod · 0.45

Tested by

no test coverage detected