MCPcopy
hub / github.com/Effect-TS/effect / emit

Function emit

packages/ai/amazon-bedrock/src/EventStreamEncoding.ts:57–106  ·  view source on GitHub ↗
(chunks)

Source from the content-addressed store, hash-verified

55 return Effect.void
56 },
57 emit(chunks) {
58 return Effect.forEach(
59 chunks,
60 Effect.fnUntraced(function*(chunk) {
61 // Append new chunk to buffer
62 const newBuffer = new Uint8Array(buffer.length + chunk.length)
63 newBuffer.set(buffer)
64 newBuffer.set(chunk, buffer.length)
65 buffer = newBuffer
66
67 // Try to decode messages from the buffer
68 while (buffer.length >= 4) {
69 // The first four bytes are the total length of the message (big-endian)
70 const totalLength = new DataView(
71 buffer.buffer,
72 buffer.byteOffset,
73 buffer.byteLength
74 ).getUint32(0, false)
75
76 // If we don't have the full message yet, keep looping
77 if (buffer.length < totalLength) {
78 break
79 }
80
81 // Decode exactly the sub-slice for this event
82 const subView = buffer.subarray(0, totalLength)
83 const decoded = codec.decode(subView)
84
85 // Slice the used bytes off the buffer, removing this message
86 buffer = buffer.slice(totalLength)
87
88 // Process the message
89 if (decoded.headers[":message-type"]?.value === "event") {
90 const data = textDecoder.decode(decoded.body)
91
92 // Wrap the data in the `":event-type"` field to match the
93 // expected schema
94 const message = yield* decodeMessage({
95 [decoded.headers[":event-type"]?.value as string]: JSON.parse(data)
96 }).pipe(Effect.provide(context))
97
98 messages.push(message)
99 }
100 }
101 yield* mailbox.offerAll(messages)
102 messages = []
103 }),
104 { discard: true }
105 ).pipe(Effect.catchAll((error) => mailbox.fail(error)))
106 },
107 error(cause) {
108 return mailbox.failCause(cause)
109 },

Callers

nothing calls this directly

Calls 7

decodeMethod · 0.80
parseMethod · 0.80
pipeMethod · 0.65
setMethod · 0.65
provideMethod · 0.65
offerAllMethod · 0.65
failMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…