(
sessionId: string,
workspaceId: string,
onConnected: () => void,
onChatMode: OnChatMode,
subscriptionToken: symbol
)
| 1527 | } |
| 1528 | |
| 1529 | private async runChatSubscription( |
| 1530 | sessionId: string, |
| 1531 | workspaceId: string, |
| 1532 | onConnected: () => void, |
| 1533 | onChatMode: OnChatMode, |
| 1534 | subscriptionToken: symbol |
| 1535 | ): Promise<void> { |
| 1536 | const chatStream = await this.server.client.workspace.onChat({ |
| 1537 | workspaceId, |
| 1538 | mode: onChatMode, |
| 1539 | }); |
| 1540 | onConnected(); |
| 1541 | this.touchSession(sessionId); |
| 1542 | |
| 1543 | // Decouple turn resolution from sessionUpdate writes. |
| 1544 | // handleStreamEvent must fire for every event regardless of stdout |
| 1545 | // backpressure. Without this decoupling, stream-end can stall behind |
| 1546 | // a blocked sessionUpdate write, preventing resolveTurn from firing |
| 1547 | // and causing prompt() to hang indefinitely. |
| 1548 | // |
| 1549 | // Keep memory bounded without stalling terminal-event observation. |
| 1550 | // |
| 1551 | // If stdout backpressure saturates the forwarding queue and we block the |
| 1552 | // drain loop, stream-end/stream-abort can be delayed behind thousands of |
| 1553 | // queued deltas, making prompt() appear hung. Instead, once saturated we |
| 1554 | // drop only high-volume chunk/replay events while preserving lifecycle/ |
| 1555 | // control events (caught-up, tool-call-end, etc.) so turn-completion and |
| 1556 | // translator state remain correct. |
| 1557 | const { push, iterate, end } = createAsyncMessageQueue<WorkspaceChatMessage>(); |
| 1558 | let queuedEventCount = 0; |
| 1559 | let droppedNonTerminalEventCount = 0; |
| 1560 | let hasCaughtUp = onChatMode.type !== "full"; |
| 1561 | |
| 1562 | const isTerminalStreamEvent = (event: WorkspaceChatMessage): boolean => |
| 1563 | event.type === "stream-end" || |
| 1564 | event.type === "stream-abort" || |
| 1565 | event.type === "stream-error" || |
| 1566 | event.type === "error"; |
| 1567 | |
| 1568 | const isDroppableUnderBackpressure = (event: WorkspaceChatMessage): boolean => { |
| 1569 | const isReplayMessageEvent = |
| 1570 | event.type === "message" && |
| 1571 | ((event as { replay?: boolean }).replay === true || !hasCaughtUp); |
| 1572 | |
| 1573 | return ( |
| 1574 | event.type === "stream-delta" || |
| 1575 | event.type === "reasoning-delta" || |
| 1576 | event.type === "tool-call-delta" || |
| 1577 | event.type === "usage-delta" || |
| 1578 | event.type === "session-usage-delta" || |
| 1579 | event.type === "advisor-output" || |
| 1580 | event.type === "advisor-reasoning-output" || |
| 1581 | event.type === "bash-output" || |
| 1582 | event.type === "init-output" || |
| 1583 | // Drop replay history messages under saturation, but keep live message |
| 1584 | // events so ACP clients do not miss real-time conversation updates. |
| 1585 | isReplayMessageEvent |
| 1586 | ); |
no test coverage detected