MCPcopy Index your code
hub / github.com/Effect-TS/effect / zipWithNext

Function zipWithNext

packages/effect/src/internal/stream.ts:8611–8648  ·  view source on GitHub ↗
(
  self: Stream.Stream<A, E, R>
)

Source from the content-addressed store, hash-verified

8609
8610/** @internal */
8611export const zipWithNext = <A, E, R>(
8612 self: Stream.Stream<A, E, R>
8613): Stream.Stream<[A, Option.Option<A>], E, R> => {
8614 const process = (
8615 last: Option.Option<A>
8616 ): Channel.Channel<Chunk.Chunk<readonly [A, Option.Option<A>]>, Chunk.Chunk<A>, never, never, void, unknown> =>
8617 core.readWithCause({
8618 onInput: (input: Chunk.Chunk<A>) => {
8619 const [newLast, chunk] = Chunk.mapAccum(
8620 input,
8621 last,
8622 (prev, curr) => [Option.some(curr), pipe(prev, Option.map((a) => [a, curr] as const))] as const
8623 )
8624 const output = Chunk.filterMap(
8625 chunk,
8626 (option) =>
8627 Option.isSome(option) ?
8628 Option.some([option.value[0], Option.some(option.value[1])] as const) :
8629 Option.none()
8630 )
8631 return core.flatMap(
8632 core.write(output),
8633 () => process(newLast)
8634 )
8635 },
8636 onFailure: core.failCause,
8637 onDone: () =>
8638 Option.match(last, {
8639 onNone: () => core.void,
8640 onSome: (value) =>
8641 channel.zipRight(
8642 core.write(Chunk.of<readonly [A, Option.Option<A>]>([value, Option.none()])),
8643 core.void
8644 )
8645 })
8646 })
8647 return new StreamImpl(pipe(toChannel(self), channel.pipeToOrFail(process(Option.none()))))
8648}
8649
8650/** @internal */
8651export const zipWithPrevious = <A, E, R>(

Callers 1

zipWithPreviousAndNextFunction · 0.85

Calls 3

pipeFunction · 0.70
toChannelFunction · 0.70
processFunction · 0.70

Tested by

no test coverage detected