MCPcopy
hub / github.com/TanStack/ai / normalizeConnectionAdapter

Function normalizeConnectionAdapter

packages/ai-client/src/connection-adapters.ts:254–396  ·  view source on GitHub ↗
(
  connection: ConnectionAdapter | undefined,
)

Source from the content-addressed store, hash-verified

252 * Otherwise, connect() is wrapped using an async queue.
253 */
254export function normalizeConnectionAdapter(
255 connection: ConnectionAdapter | undefined,
256): SubscribeConnectionAdapter {
257 if (!connection) {
258 throw new Error('Connection adapter is required')
259 }
260
261 const hasConnect = 'connect' in connection
262 const hasSubscribe = 'subscribe' in connection
263 const hasSend = 'send' in connection
264
265 if (hasConnect && (hasSubscribe || hasSend)) {
266 throw new Error(
267 'Connection adapter must provide either connect or both subscribe and send, not both modes',
268 )
269 }
270
271 if (hasSubscribe && hasSend) {
272 return {
273 subscribe: connection.subscribe.bind(connection),
274 send: connection.send.bind(connection),
275 }
276 }
277
278 if (!hasConnect) {
279 throw new Error(
280 'Connection adapter must provide either connect or both subscribe and send',
281 )
282 }
283
284 // Legacy connect() wrapper
285 let activeBuffer: Array<StreamChunk> = []
286 let activeWaiters: Array<(chunk: StreamChunk | null) => void> = []
287
288 function push(chunk: StreamChunk, runId?: string): void {
289 if (runId) {
290 chunkRunIds.set(chunk, runId)
291 }
292 const waiter = activeWaiters.shift()
293 if (waiter) {
294 waiter(chunk)
295 } else {
296 activeBuffer.push(chunk)
297 }
298 }
299
300 return {
301 subscribe(abortSignal?: AbortSignal): AsyncIterable<StreamChunk> {
302 // Transfer ownership to the latest subscriber so only one active
303 // subscribe() call receives chunks from the shared connect-wrapper queue.
304 const myBuffer: Array<StreamChunk> = activeBuffer.splice(0)
305 const myWaiters: Array<(chunk: StreamChunk | null) => void> = []
306 activeBuffer = myBuffer
307 activeWaiters = myWaiters
308
309 return (async function* () {
310 while (!abortSignal?.aborted) {
311 let chunk: StreamChunk | null

Callers 3

constructorMethod · 0.90
updateOptionsMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected