MCPcopy Index your code
hub / github.com/Effect-TS/effect / streamRequest

Function streamRequest

packages/ai/openai/src/OpenAiClient.ts:125–164  ·  view source on GitHub ↗
(
      request: HttpClientRequest.HttpClientRequest,
      schema: Schema.Schema<A, I, R>
    )

Source from the content-addressed store, hash-verified

123 })
124
125 const streamRequest = <A, I, R>(
126 request: HttpClientRequest.HttpClientRequest,
127 schema: Schema.Schema<A, I, R>
128 ): Stream.Stream<A, AiError.AiError, R> => {
129 const decodeEvent = Schema.decode(Schema.parseJson(schema))
130 return httpClientOk.execute(request).pipe(
131 Effect.map((r) => r.stream),
132 Stream.unwrapScoped,
133 Stream.decodeText(),
134 Stream.pipeThroughChannel(Sse.makeChannel()),
135 // Decode each SSE event, but don't let a single unrecognized or malformed
136 // frame abort the whole stream. OpenAI emits events absent from the generated
137 // schema (e.g. `keepalive` heartbeats during long Responses turns); skip what
138 // we can't decode and keep the stream alive.
139 Stream.mapEffect((event) =>
140 decodeEvent(event.data).pipe(
141 Effect.asSome,
142 Effect.catchTag("ParseError", (error) =>
143 Effect.logDebug("Skipping undecodable stream event", error).pipe(
144 Effect.as(Option.none())
145 ))
146 )
147 ),
148 Stream.filterMap(identity),
149 Stream.catchTags({
150 RequestError: (error) =>
151 AiError.HttpRequestError.fromRequestError({
152 module: "OpenAiClient",
153 method: "streamRequest",
154 error
155 }),
156 ResponseError: (error) =>
157 AiError.HttpResponseError.fromResponseError({
158 module: "OpenAiClient",
159 method: "streamRequest",
160 error
161 })
162 })
163 )
164 }
165
166 const createResponse = (
167 options: typeof Generated.CreateResponse.Encoded

Callers 1

createResponseStreamFunction · 0.70

Calls 6

decodeMethod · 0.80
fromRequestErrorMethod · 0.80
fromResponseErrorMethod · 0.80
pipeMethod · 0.65
mapMethod · 0.65
executeMethod · 0.45

Tested by

no test coverage detected