| 100 | /** @internal */ |
| 101 | export const accumulateChunks = <A, E, R>(self: Stream.Stream<A, E, R>): Stream.Stream<A, E, R> => { |
| 102 | const accumulator = ( |
| 103 | s: Chunk.Chunk<A> |
| 104 | ): Channel.Channel<Chunk.Chunk<A>, Chunk.Chunk<A>, E, E, void, unknown> => |
| 105 | core.readWith({ |
| 106 | onInput: (input: Chunk.Chunk<A>) => { |
| 107 | const next = Chunk.appendAll(s, input) |
| 108 | return core.flatMap( |
| 109 | core.write(next), |
| 110 | () => accumulator(next) |
| 111 | ) |
| 112 | }, |
| 113 | onFailure: core.fail, |
| 114 | onDone: () => core.void |
| 115 | }) |
| 116 | return new StreamImpl(core.pipeTo(toChannel(self), accumulator(Chunk.empty()))) |
| 117 | } |
| 118 | |