| 6277 | return pipe(core.write(result), core.flatMap(() => channelEnd)) |
| 6278 | } |
| 6279 | const reader = ( |
| 6280 | queueSize: number |
| 6281 | ): Channel.Channel<Chunk.Chunk<Chunk.Chunk<A>>, Chunk.Chunk<A>, E, E, unknown, unknown> => |
| 6282 | core.readWithCause({ |
| 6283 | onInput: (input: Chunk.Chunk<A>) => |
| 6284 | core.flatMap( |
| 6285 | core.write( |
| 6286 | Chunk.filterMap(input, (element, index) => { |
| 6287 | queue.put(element) |
| 6288 | const currentIndex = queueSize + index + 1 |
| 6289 | if (currentIndex < chunkSize || (currentIndex - chunkSize) % stepSize > 0) { |
| 6290 | return Option.none() |
| 6291 | } |
| 6292 | return Option.some(queue.toChunk()) |
| 6293 | }) |
| 6294 | ), |
| 6295 | () => reader(queueSize + input.length) |
| 6296 | ), |
| 6297 | onFailure: (cause) => emitOnStreamEnd(queueSize, core.failCause(cause)), |
| 6298 | onDone: () => emitOnStreamEnd(queueSize, core.void) |
| 6299 | }) |
| 6300 | return pipe(toChannel(self), core.pipeTo(reader(0))) |
| 6301 | })) |
| 6302 | } |