( self: Stream.Stream<A, E, R> )
| 8609 | |
| 8610 | /** @internal */ |
| 8611 | export const zipWithNext = <A, E, R>( |
| 8612 | self: Stream.Stream<A, E, R> |
| 8613 | ): Stream.Stream<[A, Option.Option<A>], E, R> => { |
| 8614 | const process = ( |
| 8615 | last: Option.Option<A> |
| 8616 | ): Channel.Channel<Chunk.Chunk<readonly [A, Option.Option<A>]>, Chunk.Chunk<A>, never, never, void, unknown> => |
| 8617 | core.readWithCause({ |
| 8618 | onInput: (input: Chunk.Chunk<A>) => { |
| 8619 | const [newLast, chunk] = Chunk.mapAccum( |
| 8620 | input, |
| 8621 | last, |
| 8622 | (prev, curr) => [Option.some(curr), pipe(prev, Option.map((a) => [a, curr] as const))] as const |
| 8623 | ) |
| 8624 | const output = Chunk.filterMap( |
| 8625 | chunk, |
| 8626 | (option) => |
| 8627 | Option.isSome(option) ? |
| 8628 | Option.some([option.value[0], Option.some(option.value[1])] as const) : |
| 8629 | Option.none() |
| 8630 | ) |
| 8631 | return core.flatMap( |
| 8632 | core.write(output), |
| 8633 | () => process(newLast) |
| 8634 | ) |
| 8635 | }, |
| 8636 | onFailure: core.failCause, |
| 8637 | onDone: () => |
| 8638 | Option.match(last, { |
| 8639 | onNone: () => core.void, |
| 8640 | onSome: (value) => |
| 8641 | channel.zipRight( |
| 8642 | core.write(Chunk.of<readonly [A, Option.Option<A>]>([value, Option.none()])), |
| 8643 | core.void |
| 8644 | ) |
| 8645 | }) |
| 8646 | }) |
| 8647 | return new StreamImpl(pipe(toChannel(self), channel.pipeToOrFail(process(Option.none())))) |
| 8648 | } |
| 8649 | |
| 8650 | /** @internal */ |
| 8651 | export const zipWithPrevious = <A, E, R>( |
no test coverage detected