MCPcopy
hub / github.com/coder/mux / handleStreamEnd

Method handleStreamEnd

src/node/services/taskService.ts:7918–8319  ·  view source on GitHub ↗
(event: StreamEndEvent)

Source from the content-addressed store, hash-verified

7916 }
7917
7918 private async handleStreamEnd(event: StreamEndEvent): Promise<void> {
7919 const workspaceId = event.workspaceId;
7920
7921 // Ensure any in-flight notify_on_terminal persistence (from a just-detached foreground wait)
7922 // has settled so the config we read below reflects the durable non-blocking policy.
7923 if (this.pendingNotifyOnTerminalPersists.size > 0) {
7924 await Promise.all([...this.pendingNotifyOnTerminalPersists]);
7925 }
7926
7927 // The owner's own stream ending is the signal to retry any terminal wake-ups that were deferred
7928 // while it was busy. Drain checks idle internally and leaves notifications pending otherwise.
7929 this.scheduleTerminalAttentionDrain(workspaceId);
7930
7931 const cfg = this.config.loadConfigOrDefault();
7932 const entry = findWorkspaceEntry(cfg, workspaceId);
7933 if (!entry) return;
7934 const taskIndex = this.buildAgentTaskIndex(cfg);
7935
7936 // Parent workspaces must not end while they have active background tasks/workflows.
7937 // Enforce by auto-resuming the stream with a directive to await outstanding work.
7938 if (!entry.workspace.parentWorkspaceId) {
7939 const hasActiveDescendants = this.hasActiveDescendantAgentTasksUsingIndex(
7940 taskIndex,
7941 workspaceId
7942 );
7943 const referencedWorkflowRunIds = await this.listAgentReferencedWorkflowRunIds(
7944 workspaceId,
7945 event.parts,
7946 event.messageId
7947 );
7948 let activeWorkflowRunIds = await this.listActiveBackgroundWorkflowRunIds(
7949 workspaceId,
7950 referencedWorkflowRunIds
7951 );
7952 let activeWorkspaceTurnIds = await this.listActiveWorkspaceTurnTaskIdsForOwner(workspaceId);
7953 if (!hasActiveDescendants) {
7954 // Foreground best-of children can finish while the parent task tool call is still pending,
7955 // which temporarily blocks their leaf cleanup and may defer synthetic fallback delivery.
7956 // Recheck both once the parent stream reaches a descendant-free stream-end.
7957 await this.deliverDeferredBestOfReportsForParent(workspaceId);
7958 await this.requestReportedChildCleanupRechecks(workspaceId);
7959 if (activeWorkflowRunIds.length === 0 && activeWorkspaceTurnIds.length === 0) {
7960 if (await this.finalizeWorkspaceTurnFromStreamEnd(event)) {
7961 return;
7962 }
7963 this.consecutiveAutoResumes.delete(workspaceId);
7964 return;
7965 }
7966 }
7967
7968 // Workflow-owned descendants report through the workflow runner; parent nudges must not
7969 // bypass that journal/final-result path by asking the model to task_await those child tasks
7970 // directly. Instead, await the owning workflow run when one is still active.
7971 // Foreground waits can also be backgrounded at runtime when users queue another message.
7972 const listBlockingDescendantTaskIds = () =>
7973 this.listBlockingActiveDescendantAgentTaskIdsUsingIndex(taskIndex, workspaceId, {
7974 excludeWorkflowTasks: true,
7975 });

Callers 1

constructorMethod · 0.95

Tested by

no test coverage detected