(
workspaceId: string,
aggregator: StreamingMessageAggregator,
data: WorkspaceChatMessage
)
| 4240 | } |
| 4241 | |
| 4242 | private processStreamEvent( |
| 4243 | workspaceId: string, |
| 4244 | aggregator: StreamingMessageAggregator, |
| 4245 | data: WorkspaceChatMessage |
| 4246 | ): void { |
| 4247 | // Handle special events first |
| 4248 | if (isStreamError(data)) { |
| 4249 | const transient = this.assertChatTransientState(workspaceId); |
| 4250 | |
| 4251 | // Suppress side effects during buffered replay (we're just hydrating UI state), but allow |
| 4252 | // live errors to trigger mux-gateway session-expired handling even before we're "caught up". |
| 4253 | // In particular, mux-gateway 401s can surface as a pre-stream stream-error (before any |
| 4254 | // stream-start) during startup/reconnect. |
| 4255 | const allowSideEffects = !transient.replayingHistory; |
| 4256 | |
| 4257 | applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects }); |
| 4258 | |
| 4259 | // stream-error is a terminal stream event, just like stream-end/stream-abort. |
| 4260 | // Mirror their cleanup so subscribers don't see stale streaming stats from the |
| 4261 | // failed stream — applyWorkspaceChatEventToAggregator already cleared token |
| 4262 | // state; we now flush any pending coalesced bump and invalidate the |
| 4263 | // streamingStatsStore cache so useWorkspaceStreamingStats returns null. |
| 4264 | this.cancelPendingIdleBump(workspaceId); |
| 4265 | this.cancelPendingStreamingBump(workspaceId); |
| 4266 | this.states.bump(workspaceId); |
| 4267 | this.streamingStatsStore.bump(workspaceId); |
| 4268 | return; |
| 4269 | } |
| 4270 | |
| 4271 | if (isDeleteMessage(data)) { |
| 4272 | applyWorkspaceChatEventToAggregator(aggregator, data); |
| 4273 | this.cleanupStaleLiveToolState(workspaceId, aggregator); |
| 4274 | this.states.bump(workspaceId); |
| 4275 | this.checkAndBumpRecencyIfChanged(); |
| 4276 | this.usageStore.bump(workspaceId); |
| 4277 | this.consumerManager.scheduleCalculation(workspaceId, aggregator); |
| 4278 | return; |
| 4279 | } |
| 4280 | |
| 4281 | if (isBashOutputEvent(data)) { |
| 4282 | if (data.text.length === 0) return; |
| 4283 | |
| 4284 | const transient = this.assertChatTransientState(workspaceId); |
| 4285 | |
| 4286 | const prev = transient.liveBashOutput.get(data.toolCallId); |
| 4287 | const next = appendLiveBashOutputChunk( |
| 4288 | prev, |
| 4289 | { text: data.text, isError: data.isError }, |
| 4290 | BASH_TRUNCATE_MAX_TOTAL_BYTES |
| 4291 | ); |
| 4292 | |
| 4293 | // Avoid unnecessary re-renders if this event didn't change the stored state. |
| 4294 | if (next === prev) return; |
| 4295 | |
| 4296 | transient.liveBashOutput.set(data.toolCallId, next); |
| 4297 | |
| 4298 | // High-frequency: throttle UI updates like other delta-style events. |
| 4299 | this.scheduleIdleStateBump(workspaceId); |
no test coverage detected