( connection: ConnectionAdapter | undefined, )
| 252 | * Otherwise, connect() is wrapped using an async queue. |
| 253 | */ |
| 254 | export function normalizeConnectionAdapter( |
| 255 | connection: ConnectionAdapter | undefined, |
| 256 | ): SubscribeConnectionAdapter { |
| 257 | if (!connection) { |
| 258 | throw new Error('Connection adapter is required') |
| 259 | } |
| 260 | |
| 261 | const hasConnect = 'connect' in connection |
| 262 | const hasSubscribe = 'subscribe' in connection |
| 263 | const hasSend = 'send' in connection |
| 264 | |
| 265 | if (hasConnect && (hasSubscribe || hasSend)) { |
| 266 | throw new Error( |
| 267 | 'Connection adapter must provide either connect or both subscribe and send, not both modes', |
| 268 | ) |
| 269 | } |
| 270 | |
| 271 | if (hasSubscribe && hasSend) { |
| 272 | return { |
| 273 | subscribe: connection.subscribe.bind(connection), |
| 274 | send: connection.send.bind(connection), |
| 275 | } |
| 276 | } |
| 277 | |
| 278 | if (!hasConnect) { |
| 279 | throw new Error( |
| 280 | 'Connection adapter must provide either connect or both subscribe and send', |
| 281 | ) |
| 282 | } |
| 283 | |
| 284 | // Legacy connect() wrapper |
| 285 | let activeBuffer: Array<StreamChunk> = [] |
| 286 | let activeWaiters: Array<(chunk: StreamChunk | null) => void> = [] |
| 287 | |
| 288 | function push(chunk: StreamChunk, runId?: string): void { |
| 289 | if (runId) { |
| 290 | chunkRunIds.set(chunk, runId) |
| 291 | } |
| 292 | const waiter = activeWaiters.shift() |
| 293 | if (waiter) { |
| 294 | waiter(chunk) |
| 295 | } else { |
| 296 | activeBuffer.push(chunk) |
| 297 | } |
| 298 | } |
| 299 | |
| 300 | return { |
| 301 | subscribe(abortSignal?: AbortSignal): AsyncIterable<StreamChunk> { |
| 302 | // Transfer ownership to the latest subscriber so only one active |
| 303 | // subscribe() call receives chunks from the shared connect-wrapper queue. |
| 304 | const myBuffer: Array<StreamChunk> = activeBuffer.splice(0) |
| 305 | const myWaiters: Array<(chunk: StreamChunk | null) => void> = [] |
| 306 | activeBuffer = myBuffer |
| 307 | activeWaiters = myWaiters |
| 308 | |
| 309 | return (async function* () { |
| 310 | while (!abortSignal?.aborted) { |
| 311 | let chunk: StreamChunk | null |
no outgoing calls
no test coverage detected