MCPcopy
hub / github.com/Effect-TS/effect / streamRequest

Function streamRequest

packages/ai/anthropic/src/AnthropicClient.ts:227–259  ·  view source on GitHub ↗
(
    request: HttpClientRequest.HttpClientRequest,
    schema: Schema.Schema<A, I, R>
  )

Source from the content-addressed store, hash-verified

225 })
226
227 const streamRequest = <A, I, R>(
228 request: HttpClientRequest.HttpClientRequest,
229 schema: Schema.Schema<A, I, R>
230 ): Stream.Stream<A, AiError.AiError, R> => {
231 const decodeEvents = Schema.decode(Schema.ChunkFromSelf(Schema.parseJson(schema)))
232 return httpClientOk.execute(request).pipe(
233 Effect.map((r) => r.stream),
234 Stream.unwrapScoped,
235 Stream.decodeText(),
236 Stream.pipeThroughChannel(Sse.makeChannel()),
237 Stream.mapChunksEffect((chunk) => decodeEvents(Chunk.map(chunk, (event) => event.data))),
238 Stream.catchTags({
239 RequestError: (error) =>
240 AiError.HttpRequestError.fromRequestError({
241 module: "AnthropicClient",
242 method: "streamRequest",
243 error
244 }),
245 ResponseError: (error) =>
246 AiError.HttpResponseError.fromResponseError({
247 module: "AnthropicClient",
248 method: "streamRequest",
249 error
250 }),
251 ParseError: (error) =>
252 AiError.MalformedOutput.fromParseError({
253 module: "AnthropicClient",
254 method: "streamRequest",
255 error
256 })
257 })
258 )
259 }
260
261 const createMessage: (options: {
262 readonly params?: typeof Generated.BetaMessagesPostParams.Encoded | undefined

Callers 1

createMessageStreamFunction · 0.70

Calls 7

decodeMethod · 0.80
fromRequestErrorMethod · 0.80
fromResponseErrorMethod · 0.80
pipeMethod · 0.65
mapMethod · 0.65
executeMethod · 0.45
fromParseErrorMethod · 0.45

Tested by

no test coverage detected