MCPcopy Index your code
hub / github.com/Effect-TS/effect / processChunk

Function processChunk

packages/effect/src/internal/stream.ts:2915–2945  ·  view source on GitHub ↗
(
    chunk: Chunk.Chunk<Exit.Exit<A, Option.Option<E2>>>,
    cont: Channel.Channel<Chunk.Chunk<A>, Chunk.Chunk<Exit.Exit<A, Option.Option<E2>>>, E | E2, E, unknown, unknown, R>
  )

Source from the content-addressed store, hash-verified

2913 self: Stream.Stream<Exit.Exit<A, Option.Option<E2>>, E, R>
2914): Stream.Stream<A, E | E2, R> => {
2915 const processChunk = (
2916 chunk: Chunk.Chunk<Exit.Exit<A, Option.Option<E2>>>,
2917 cont: Channel.Channel<Chunk.Chunk<A>, Chunk.Chunk<Exit.Exit<A, Option.Option<E2>>>, E | E2, E, unknown, unknown, R>
2918 ) => {
2919 const [toEmit, rest] = pipe(chunk, Chunk.splitWhere((exit) => !Exit.isSuccess(exit)))
2920 const next = pipe(
2921 Chunk.head(rest),
2922 Option.match({
2923 onNone: () => cont,
2924 onSome: Exit.match({
2925 onFailure: (cause) =>
2926 Option.match(Cause.flipCauseOption(cause), {
2927 onNone: () => core.void,
2928 onSome: core.failCause
2929 }),
2930 onSuccess: () => core.void
2931 })
2932 })
2933 )
2934 return pipe(
2935 core.write(pipe(
2936 toEmit,
2937 Chunk.filterMap((exit) =>
2938 Exit.isSuccess(exit) ?
2939 Option.some(exit.value) :
2940 Option.none()
2941 )
2942 )),
2943 core.flatMap(() => next)
2944 )
2945 }
2946 const process: Channel.Channel<
2947 Chunk.Chunk<A>,
2948 Chunk.Chunk<Exit.Exit<A, Option.Option<E2>>>,

Callers 1

flattenExitOptionFunction · 0.85

Calls 3

headMethod · 0.80
pipeFunction · 0.70
writeMethod · 0.65

Tested by

no test coverage detected