(
client: Client,
request: Request<Rpcs>,
stream: Stream.Stream<any, any> | Effect.Effect<Mailbox.ReadonlyMailbox<any, any>, any, Scope.Scope>
)
| 354 | } |
| 355 | |
| 356 | const streamEffect = ( |
| 357 | client: Client, |
| 358 | request: Request<Rpcs>, |
| 359 | stream: Stream.Stream<any, any> | Effect.Effect<Mailbox.ReadonlyMailbox<any, any>, any, Scope.Scope> |
| 360 | ) => { |
| 361 | let latch = client.latches.get(request.id) |
| 362 | if (supportsAck && !latch) { |
| 363 | latch = Effect.unsafeMakeLatch(false) |
| 364 | client.latches.set(request.id, latch) |
| 365 | } |
| 366 | if (Effect.isEffect(stream)) { |
| 367 | let done = false |
| 368 | return stream.pipe( |
| 369 | Effect.flatMap((mailbox) => |
| 370 | Effect.whileLoop({ |
| 371 | while: () => !done, |
| 372 | body: constant(Effect.flatMap(mailbox.takeAll, ([chunk, done_]) => { |
| 373 | done = done_ |
| 374 | if (!Chunk.isNonEmpty(chunk)) return Effect.void |
| 375 | const write = options.onFromServer({ |
| 376 | _tag: "Chunk", |
| 377 | clientId: client.id, |
| 378 | requestId: request.id, |
| 379 | values: Chunk.toReadonlyArray(chunk) |
| 380 | }) |
| 381 | if (!latch) return write |
| 382 | latch.unsafeClose() |
| 383 | return Effect.zipRight(write, latch.await) |
| 384 | })), |
| 385 | step: constVoid |
| 386 | }) |
| 387 | ), |
| 388 | Effect.scoped |
| 389 | ) |
| 390 | } |
| 391 | return Stream.runForEachChunk(stream, (chunk) => { |
| 392 | if (!Chunk.isNonEmpty(chunk)) return Effect.void |
| 393 | const write = options.onFromServer({ |
| 394 | _tag: "Chunk", |
| 395 | clientId: client.id, |
| 396 | requestId: request.id, |
| 397 | values: Chunk.toReadonlyArray(chunk) |
| 398 | }) |
| 399 | if (!latch) return write |
| 400 | latch.unsafeClose() |
| 401 | return Effect.zipRight(write, latch.await) |
| 402 | }) |
| 403 | } |
| 404 | |
| 405 | const sendDefect = (client: Client, defect: unknown) => |
| 406 | Effect.suspend(() => { |
no test coverage detected