(
request: HttpClientRequest.HttpClientRequest,
schema: Schema.Schema<A, I, R>
)
| 123 | }) |
| 124 | |
| 125 | const streamRequest = <A, I, R>( |
| 126 | request: HttpClientRequest.HttpClientRequest, |
| 127 | schema: Schema.Schema<A, I, R> |
| 128 | ): Stream.Stream<A, AiError.AiError, R> => { |
| 129 | const decodeEvent = Schema.decode(Schema.parseJson(schema)) |
| 130 | return httpClientOk.execute(request).pipe( |
| 131 | Effect.map((r) => r.stream), |
| 132 | Stream.unwrapScoped, |
| 133 | Stream.decodeText(), |
| 134 | Stream.pipeThroughChannel(Sse.makeChannel()), |
| 135 | // Decode each SSE event, but don't let a single unrecognized or malformed |
| 136 | // frame abort the whole stream. OpenAI emits events absent from the generated |
| 137 | // schema (e.g. `keepalive` heartbeats during long Responses turns); skip what |
| 138 | // we can't decode and keep the stream alive. |
| 139 | Stream.mapEffect((event) => |
| 140 | decodeEvent(event.data).pipe( |
| 141 | Effect.asSome, |
| 142 | Effect.catchTag("ParseError", (error) => |
| 143 | Effect.logDebug("Skipping undecodable stream event", error).pipe( |
| 144 | Effect.as(Option.none()) |
| 145 | )) |
| 146 | ) |
| 147 | ), |
| 148 | Stream.filterMap(identity), |
| 149 | Stream.catchTags({ |
| 150 | RequestError: (error) => |
| 151 | AiError.HttpRequestError.fromRequestError({ |
| 152 | module: "OpenAiClient", |
| 153 | method: "streamRequest", |
| 154 | error |
| 155 | }), |
| 156 | ResponseError: (error) => |
| 157 | AiError.HttpResponseError.fromResponseError({ |
| 158 | module: "OpenAiClient", |
| 159 | method: "streamRequest", |
| 160 | error |
| 161 | }) |
| 162 | }) |
| 163 | ) |
| 164 | } |
| 165 | |
| 166 | const createResponse = ( |
| 167 | options: typeof Generated.CreateResponse.Encoded |
no test coverage detected