MCPcopy Index your code
hub / github.com/Effect-TS/effect / readUpstream

Function readUpstream

packages/effect/src/internal/channel/channelExecutor.ts:1018–1093  ·  view source on GitHub ↗
(
  r: ChannelState.Read,
  onSuccess: () => Effect.Effect<A, E2, R>,
  onFailure: (cause: Cause.Cause<E>) => Effect.Effect<A, E2, R>
)

Source from the content-addressed store, hash-verified

1016 * @internal
1017 */
1018export const readUpstream = <A, E2, R, E>(
1019 r: ChannelState.Read,
1020 onSuccess: () => Effect.Effect<A, E2, R>,
1021 onFailure: (cause: Cause.Cause<E>) => Effect.Effect<A, E2, R>
1022): Effect.Effect<A, E2, R> => {
1023 const readStack = [r as ChannelState.Read]
1024 const read = (): Effect.Effect<A, E2, R> => {
1025 const current = readStack.pop()
1026 if (current === undefined || current.upstream === undefined) {
1027 return Effect.dieMessage("Unexpected end of input for channel execution")
1028 }
1029 const state = current.upstream.run() as ChannelState.Primitive
1030 switch (state._tag) {
1031 case ChannelStateOpCodes.OP_EMIT: {
1032 const emitEffect = current.onEmit(current.upstream.getEmit())
1033 if (readStack.length === 0) {
1034 if (emitEffect === undefined) {
1035 return Effect.suspend(onSuccess)
1036 }
1037 return pipe(
1038 emitEffect as Effect.Effect<void>,
1039 Effect.matchCauseEffect({ onFailure, onSuccess })
1040 )
1041 }
1042 if (emitEffect === undefined) {
1043 return Effect.suspend(() => read())
1044 }
1045 return pipe(
1046 emitEffect as Effect.Effect<void>,
1047 Effect.matchCauseEffect({ onFailure, onSuccess: () => read() })
1048 )
1049 }
1050
1051 case ChannelStateOpCodes.OP_DONE: {
1052 const doneEffect = current.onDone(current.upstream.getDone())
1053 if (readStack.length === 0) {
1054 if (doneEffect === undefined) {
1055 return Effect.suspend(onSuccess)
1056 }
1057 return pipe(
1058 doneEffect as Effect.Effect<void>,
1059 Effect.matchCauseEffect({ onFailure, onSuccess })
1060 )
1061 }
1062 if (doneEffect === undefined) {
1063 return Effect.suspend(() => read())
1064 }
1065 return pipe(
1066 doneEffect as Effect.Effect<void>,
1067 Effect.matchCauseEffect({ onFailure, onSuccess: () => read() })
1068 )
1069 }
1070
1071 case ChannelStateOpCodes.OP_FROM_EFFECT: {
1072 readStack.push(current)
1073 return pipe(
1074 current.onEffect(state.effect as Effect.Effect<void>) as Effect.Effect<void>,
1075 Effect.catchAllCause((cause) =>

Callers 2

drainerMethod · 0.85
channelExecutor.tsFile · 0.85

Calls 1

readFunction · 0.70

Tested by

no test coverage detected