MCPcopy
hub / github.com/Effect-TS/effect / start

Function start

packages/effect/src/internal/stream.ts:7215–7238  ·  view source on GitHub ↗
(controller)

Source from the content-addressed store, hash-verified

7213
7214 return new ReadableStream<A>({
7215 start(controller) {
7216 fiber = runFork(runForEachChunk(self, (chunk) => {
7217 if (chunk.length === 0) return Effect.void
7218 return latch.whenOpen(Effect.sync(() => {
7219 latch.unsafeClose()
7220 for (const item of chunk) {
7221 controller.enqueue(item)
7222 }
7223 currentResolve!()
7224 currentResolve = undefined
7225 }))
7226 }))
7227 fiber.addObserver((exit) => {
7228 try {
7229 if (exit._tag === "Failure") {
7230 controller.error(Cause.squash(exit.cause))
7231 } else {
7232 controller.close()
7233 }
7234 } catch {
7235 // ignore
7236 }
7237 })
7238 },
7239 pull() {
7240 return new Promise<void>((resolve) => {
7241 currentResolve = resolve

Callers

nothing calls this directly

Calls 6

syncMethod · 0.80
unsafeCloseMethod · 0.80
runForkFunction · 0.70
addObserverMethod · 0.65
errorMethod · 0.65
closeMethod · 0.65

Tested by

no test coverage detected