MCPcopy Index your code
hub / github.com/Effect-TS/effect / reader

Function reader

packages/effect/src/internal/stream.ts:6279–6299  ·  view source on GitHub ↗
(
        queueSize: number
      )

Source from the content-addressed store, hash-verified

6277 return pipe(core.write(result), core.flatMap(() => channelEnd))
6278 }
6279 const reader = (
6280 queueSize: number
6281 ): Channel.Channel<Chunk.Chunk<Chunk.Chunk<A>>, Chunk.Chunk<A>, E, E, unknown, unknown> =>
6282 core.readWithCause({
6283 onInput: (input: Chunk.Chunk<A>) =>
6284 core.flatMap(
6285 core.write(
6286 Chunk.filterMap(input, (element, index) => {
6287 queue.put(element)
6288 const currentIndex = queueSize + index + 1
6289 if (currentIndex < chunkSize || (currentIndex - chunkSize) % stepSize > 0) {
6290 return Option.none()
6291 }
6292 return Option.some(queue.toChunk())
6293 })
6294 ),
6295 () => reader(queueSize + input.length)
6296 ),
6297 onFailure: (cause) => emitOnStreamEnd(queueSize, core.failCause(cause)),
6298 onDone: () => emitOnStreamEnd(queueSize, core.void)
6299 })
6300 return pipe(toChannel(self), core.pipeTo(reader(0)))
6301 }))
6302 }

Callers 1

stream.tsFile · 0.85

Calls 5

emitOnStreamEndFunction · 0.85
putMethod · 0.80
toChunkMethod · 0.80
failCauseMethod · 0.80
writeMethod · 0.65

Tested by

no test coverage detected