( r: ChannelState.Read, onSuccess: () => Effect.Effect<A, E2, R>, onFailure: (cause: Cause.Cause<E>) => Effect.Effect<A, E2, R> )
| 1016 | * @internal |
| 1017 | */ |
| 1018 | export const readUpstream = <A, E2, R, E>( |
| 1019 | r: ChannelState.Read, |
| 1020 | onSuccess: () => Effect.Effect<A, E2, R>, |
| 1021 | onFailure: (cause: Cause.Cause<E>) => Effect.Effect<A, E2, R> |
| 1022 | ): Effect.Effect<A, E2, R> => { |
| 1023 | const readStack = [r as ChannelState.Read] |
| 1024 | const read = (): Effect.Effect<A, E2, R> => { |
| 1025 | const current = readStack.pop() |
| 1026 | if (current === undefined || current.upstream === undefined) { |
| 1027 | return Effect.dieMessage("Unexpected end of input for channel execution") |
| 1028 | } |
| 1029 | const state = current.upstream.run() as ChannelState.Primitive |
| 1030 | switch (state._tag) { |
| 1031 | case ChannelStateOpCodes.OP_EMIT: { |
| 1032 | const emitEffect = current.onEmit(current.upstream.getEmit()) |
| 1033 | if (readStack.length === 0) { |
| 1034 | if (emitEffect === undefined) { |
| 1035 | return Effect.suspend(onSuccess) |
| 1036 | } |
| 1037 | return pipe( |
| 1038 | emitEffect as Effect.Effect<void>, |
| 1039 | Effect.matchCauseEffect({ onFailure, onSuccess }) |
| 1040 | ) |
| 1041 | } |
| 1042 | if (emitEffect === undefined) { |
| 1043 | return Effect.suspend(() => read()) |
| 1044 | } |
| 1045 | return pipe( |
| 1046 | emitEffect as Effect.Effect<void>, |
| 1047 | Effect.matchCauseEffect({ onFailure, onSuccess: () => read() }) |
| 1048 | ) |
| 1049 | } |
| 1050 | |
| 1051 | case ChannelStateOpCodes.OP_DONE: { |
| 1052 | const doneEffect = current.onDone(current.upstream.getDone()) |
| 1053 | if (readStack.length === 0) { |
| 1054 | if (doneEffect === undefined) { |
| 1055 | return Effect.suspend(onSuccess) |
| 1056 | } |
| 1057 | return pipe( |
| 1058 | doneEffect as Effect.Effect<void>, |
| 1059 | Effect.matchCauseEffect({ onFailure, onSuccess }) |
| 1060 | ) |
| 1061 | } |
| 1062 | if (doneEffect === undefined) { |
| 1063 | return Effect.suspend(() => read()) |
| 1064 | } |
| 1065 | return pipe( |
| 1066 | doneEffect as Effect.Effect<void>, |
| 1067 | Effect.matchCauseEffect({ onFailure, onSuccess: () => read() }) |
| 1068 | ) |
| 1069 | } |
| 1070 | |
| 1071 | case ChannelStateOpCodes.OP_FROM_EFFECT: { |
| 1072 | readStack.push(current) |
| 1073 | return pipe( |
| 1074 | current.onEffect(state.effect as Effect.Effect<void>) as Effect.Effect<void>, |
| 1075 | Effect.catchAllCause((cause) => |
no test coverage detected