(controller)
| 7213 | |
| 7214 | return new ReadableStream<A>({ |
| 7215 | start(controller) { |
| 7216 | fiber = runFork(runForEachChunk(self, (chunk) => { |
| 7217 | if (chunk.length === 0) return Effect.void |
| 7218 | return latch.whenOpen(Effect.sync(() => { |
| 7219 | latch.unsafeClose() |
| 7220 | for (const item of chunk) { |
| 7221 | controller.enqueue(item) |
| 7222 | } |
| 7223 | currentResolve!() |
| 7224 | currentResolve = undefined |
| 7225 | })) |
| 7226 | })) |
| 7227 | fiber.addObserver((exit) => { |
| 7228 | try { |
| 7229 | if (exit._tag === "Failure") { |
| 7230 | controller.error(Cause.squash(exit.cause)) |
| 7231 | } else { |
| 7232 | controller.close() |
| 7233 | } |
| 7234 | } catch { |
| 7235 | // ignore |
| 7236 | } |
| 7237 | }) |
| 7238 | }, |
| 7239 | pull() { |
| 7240 | return new Promise<void>((resolve) => { |
| 7241 | currentResolve = resolve |
nothing calls this directly
no test coverage detected