MCPcopy Index your code
hub / github.com/coder/mux / createStreamAtomically

Method createStreamAtomically

src/node/services/streamManager.ts:1555–1666  ·  view source on GitHub ↗

* Atomically creates a new stream with all necessary setup

(
    workspaceId: WorkspaceId,
    streamToken: StreamToken,
    runtimeTempDir: string,
    runtime: Runtime,
    messages: ModelMessage[],
    model: LanguageModel,
    modelString: string,
    abortController: AbortController,
    system: string,
    historySequence: number,
    messageId: string,
    tools?: Record<string, Tool>,
    initialMetadata?: Partial<MuxMetadata>,
    providerOptions?: Record<string, unknown>,
    maxOutputTokens?: number,
    toolPolicy?: ToolPolicy,
    callSettingsOverrides?: ResolvedCallSettingsOverrides,
    hasQueuedMessages?: (dispatchMode?: "tool-end" | "turn-end") => boolean,
    workspaceName?: string,
    thinkingLevel?: string,
    headers?: Record<string, string | undefined>,
    anthropicCacheTtlOverride?: AnthropicCacheTtl,
    onChunk?: StreamTextOnChunk,
    onStepMessages?: (messages: ModelMessage[]) => void,
    modelFallback?: ModelFallbackOptions
  )

Source from the content-addressed store, hash-verified

1553 * Atomically creates a new stream with all necessary setup
1554 */
1555 private createStreamAtomically(
1556 workspaceId: WorkspaceId,
1557 streamToken: StreamToken,
1558 runtimeTempDir: string,
1559 runtime: Runtime,
1560 messages: ModelMessage[],
1561 model: LanguageModel,
1562 modelString: string,
1563 abortController: AbortController,
1564 system: string,
1565 historySequence: number,
1566 messageId: string,
1567 tools?: Record<string, Tool>,
1568 initialMetadata?: Partial<MuxMetadata>,
1569 providerOptions?: Record<string, unknown>,
1570 maxOutputTokens?: number,
1571 toolPolicy?: ToolPolicy,
1572 callSettingsOverrides?: ResolvedCallSettingsOverrides,
1573 hasQueuedMessages?: (dispatchMode?: "tool-end" | "turn-end") => boolean,
1574 workspaceName?: string,
1575 thinkingLevel?: string,
1576 headers?: Record<string, string | undefined>,
1577 anthropicCacheTtlOverride?: AnthropicCacheTtl,
1578 onChunk?: StreamTextOnChunk,
1579 onStepMessages?: (messages: ModelMessage[]) => void,
1580 modelFallback?: ModelFallbackOptions
1581 ): WorkspaceStreamInfo {
1582 // abortController is created and linked to the caller-provided abortSignal in startStream().
1583
1584 const stepTracker: StepMessageTracker = {};
1585 const metadataModel = this.resolveMetadataModel(modelString);
1586 const request = this.buildStreamRequestConfig(
1587 model,
1588 modelString,
1589 messages,
1590 system,
1591 tools,
1592 providerOptions,
1593 maxOutputTokens,
1594 callSettingsOverrides,
1595 toolPolicy,
1596 hasQueuedMessages,
1597 headers,
1598 anthropicCacheTtlOverride,
1599 onChunk,
1600 onStepMessages
1601 );
1602
1603 // Start streaming - this can throw immediately if API key is missing
1604 let streamResult;
1605 try {
1606 streamResult = this.createStreamResult(request, abortController, stepTracker);
1607 } catch (error) {
1608 // Clean up abort controller if stream creation fails
1609 abortController.abort();
1610 // Re-throw the error to be caught by startStream
1611 throw error;
1612 }

Callers 1

startStreamMethod · 0.95

Calls 7

resolveMetadataModelMethod · 0.95
createStreamResultMethod · 0.95
normalizeToCanonicalFunction · 0.90
resolveMethod · 0.80
setMethod · 0.80
abortMethod · 0.65

Tested by

no test coverage detected