* Starts a new stream for a workspace, automatically cancelling any existing stream * * Uses per-workspace mutex to prevent concurrent streams. The mutex ensures: * 1. Only one startStream can execute at a time per workspace * 2. Old stream fully exits before new stream starts * 3. No
(
workspaceId: string,
messages: ModelMessage[],
model: LanguageModel,
modelString: string,
historySequence: number,
system: string,
runtime: Runtime,
messageId: string,
abortSignal?: AbortSignal,
tools?: Record<string, Tool>,
initialMetadata?: Partial<MuxMetadata>,
providerOptions?: Record<string, unknown>,
maxOutputTokens?: number,
toolPolicy?: ToolPolicy,
providedStreamToken?: StreamToken,
hasQueuedMessages?: (dispatchMode?: "tool-end" | "turn-end") => boolean,
workspaceName?: string,
thinkingLevel?: string,
headers?: Record<string, string | undefined>,
anthropicCacheTtlOverride?: AnthropicCacheTtl,
callSettingsOverrides?: ResolvedCallSettingsOverrides,
onChunk?: StreamTextOnChunk,
onStepMessages?: (messages: ModelMessage[]) => void,
providedRuntimeTempDir?: string,
modelFallback?: ModelFallbackOptions
)
| 3623 | * 3. No race conditions in stream registration or cleanup |
| 3624 | */ |
| 3625 | async startStream( |
| 3626 | workspaceId: string, |
| 3627 | messages: ModelMessage[], |
| 3628 | model: LanguageModel, |
| 3629 | modelString: string, |
| 3630 | historySequence: number, |
| 3631 | system: string, |
| 3632 | runtime: Runtime, |
| 3633 | messageId: string, |
| 3634 | abortSignal?: AbortSignal, |
| 3635 | tools?: Record<string, Tool>, |
| 3636 | initialMetadata?: Partial<MuxMetadata>, |
| 3637 | providerOptions?: Record<string, unknown>, |
| 3638 | maxOutputTokens?: number, |
| 3639 | toolPolicy?: ToolPolicy, |
| 3640 | providedStreamToken?: StreamToken, |
| 3641 | hasQueuedMessages?: (dispatchMode?: "tool-end" | "turn-end") => boolean, |
| 3642 | workspaceName?: string, |
| 3643 | thinkingLevel?: string, |
| 3644 | headers?: Record<string, string | undefined>, |
| 3645 | anthropicCacheTtlOverride?: AnthropicCacheTtl, |
| 3646 | callSettingsOverrides?: ResolvedCallSettingsOverrides, |
| 3647 | onChunk?: StreamTextOnChunk, |
| 3648 | onStepMessages?: (messages: ModelMessage[]) => void, |
| 3649 | providedRuntimeTempDir?: string, |
| 3650 | modelFallback?: ModelFallbackOptions |
| 3651 | ): Promise<Result<StreamToken, SendMessageError>> { |
| 3652 | const typedWorkspaceId = workspaceId as WorkspaceId; |
| 3653 | |
| 3654 | if (messages.length === 0) { |
| 3655 | return Err({ |
| 3656 | type: "unknown", |
| 3657 | raw: "Invalid prompt: messages must not be empty", |
| 3658 | }); |
| 3659 | } |
| 3660 | |
| 3661 | // Get or create mutex for this workspace |
| 3662 | if (!this.streamLocks.has(typedWorkspaceId)) { |
| 3663 | this.streamLocks.set(typedWorkspaceId, new AsyncMutex()); |
| 3664 | } |
| 3665 | const mutex = this.streamLocks.get(typedWorkspaceId)!; |
| 3666 | |
| 3667 | try { |
| 3668 | // Acquire lock - guarantees only one startStream per workspace |
| 3669 | // Lock is automatically released when scope exits via Symbol.asyncDispose |
| 3670 | await using _lock = await mutex.acquire(); |
| 3671 | |
| 3672 | // DEBUG: Log stream start |
| 3673 | log.debug( |
| 3674 | `[STREAM START] workspaceId=${workspaceId} historySequence=${historySequence} model=${modelString}` |
| 3675 | ); |
| 3676 | |
| 3677 | const streamAbortController = new AbortController(); |
| 3678 | const unlinkAbortSignal = linkAbortSignal(abortSignal, streamAbortController); |
| 3679 | |
| 3680 | let runtimeTempDir: string | undefined; |
| 3681 | let streamRegistered = false; |
| 3682 |
no test coverage detected