MCPcopy Index your code
hub / github.com/coder/mux / startStream

Method startStream

src/node/services/streamManager.ts:3625–3775  ·  view source on GitHub ↗

* Starts a new stream for a workspace, automatically cancelling any existing stream * * Uses per-workspace mutex to prevent concurrent streams. The mutex ensures: * 1. Only one startStream can execute at a time per workspace * 2. Old stream fully exits before new stream starts * 3. No

(
    workspaceId: string,
    messages: ModelMessage[],
    model: LanguageModel,
    modelString: string,
    historySequence: number,
    system: string,
    runtime: Runtime,
    messageId: string,
    abortSignal?: AbortSignal,
    tools?: Record<string, Tool>,
    initialMetadata?: Partial<MuxMetadata>,
    providerOptions?: Record<string, unknown>,
    maxOutputTokens?: number,
    toolPolicy?: ToolPolicy,
    providedStreamToken?: StreamToken,
    hasQueuedMessages?: (dispatchMode?: "tool-end" | "turn-end") => boolean,
    workspaceName?: string,
    thinkingLevel?: string,
    headers?: Record<string, string | undefined>,
    anthropicCacheTtlOverride?: AnthropicCacheTtl,
    callSettingsOverrides?: ResolvedCallSettingsOverrides,
    onChunk?: StreamTextOnChunk,
    onStepMessages?: (messages: ModelMessage[]) => void,
    providedRuntimeTempDir?: string,
    modelFallback?: ModelFallbackOptions
  )

Source from the content-addressed store, hash-verified

3623 * 3. No race conditions in stream registration or cleanup
3624 */
3625 async startStream(
3626 workspaceId: string,
3627 messages: ModelMessage[],
3628 model: LanguageModel,
3629 modelString: string,
3630 historySequence: number,
3631 system: string,
3632 runtime: Runtime,
3633 messageId: string,
3634 abortSignal?: AbortSignal,
3635 tools?: Record<string, Tool>,
3636 initialMetadata?: Partial<MuxMetadata>,
3637 providerOptions?: Record<string, unknown>,
3638 maxOutputTokens?: number,
3639 toolPolicy?: ToolPolicy,
3640 providedStreamToken?: StreamToken,
3641 hasQueuedMessages?: (dispatchMode?: "tool-end" | "turn-end") => boolean,
3642 workspaceName?: string,
3643 thinkingLevel?: string,
3644 headers?: Record<string, string | undefined>,
3645 anthropicCacheTtlOverride?: AnthropicCacheTtl,
3646 callSettingsOverrides?: ResolvedCallSettingsOverrides,
3647 onChunk?: StreamTextOnChunk,
3648 onStepMessages?: (messages: ModelMessage[]) => void,
3649 providedRuntimeTempDir?: string,
3650 modelFallback?: ModelFallbackOptions
3651 ): Promise<Result<StreamToken, SendMessageError>> {
3652 const typedWorkspaceId = workspaceId as WorkspaceId;
3653
3654 if (messages.length === 0) {
3655 return Err({
3656 type: "unknown",
3657 raw: "Invalid prompt: messages must not be empty",
3658 });
3659 }
3660
3661 // Get or create mutex for this workspace
3662 if (!this.streamLocks.has(typedWorkspaceId)) {
3663 this.streamLocks.set(typedWorkspaceId, new AsyncMutex());
3664 }
3665 const mutex = this.streamLocks.get(typedWorkspaceId)!;
3666
3667 try {
3668 // Acquire lock - guarantees only one startStream per workspace
3669 // Lock is automatically released when scope exits via Symbol.asyncDispose
3670 await using _lock = await mutex.acquire();
3671
3672 // DEBUG: Log stream start
3673 log.debug(
3674 `[STREAM START] workspaceId=${workspaceId} historySequence=${historySequence} model=${modelString}`
3675 );
3676
3677 const streamAbortController = new AbortController();
3678 const unlinkAbortSignal = linkAbortSignal(abortSignal, streamAbortController);
3679
3680 let runtimeTempDir: string | undefined;
3681 let streamRegistered = false;
3682

Callers 2

streamMessageMethod · 0.45

Calls 15

ensureStreamSafetyMethod · 0.95
cleanupStreamTempDirMethod · 0.95
ErrFunction · 0.90
linkAbortSignalFunction · 0.90
OkFunction · 0.90
runLanguageModelCleanupFunction · 0.90
setMethod · 0.80
debugMethod · 0.80

Tested by

no test coverage detected