MCPcopy Index your code
hub / github.com/Effect-TS/effect / ChannelExecutor

Class ChannelExecutor

packages/effect/src/internal/channel/channelExecutor.ts:49–999  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

47
48/** @internal */
49export class ChannelExecutor<
50 out OutElem,
51 in InElem = unknown,
52 out OutErr = never,
53 in InErr = unknown,
54 out OutDone = void,
55 in InDone = unknown,
56 in out Env = never
57> {
58 private _activeSubexecutor: Subexecutor.Subexecutor<Env> | undefined = undefined
59
60 private _cancelled: Exit.Exit<OutErr, OutDone> | undefined = undefined
61
62 private _closeLastSubstream: Effect.Effect<unknown, never, Env> | undefined = undefined
63
64 private _currentChannel: core.Primitive | undefined
65
66 private _done: Exit.Exit<unknown, unknown> | undefined = undefined
67
68 private _doneStack: Array<ErasedContinuation<Env>> = []
69
70 private _emitted: unknown | undefined = undefined
71
72 private _executeCloseLastSubstream: (
73 effect: Effect.Effect<unknown, never, Env>
74 ) => Effect.Effect<unknown, never, Env>
75
76 private _input: ErasedExecutor<Env> | undefined = undefined
77
78 private _inProgressFinalizer: Effect.Effect<unknown, never, Env> | undefined = undefined
79
80 private _providedEnv: Context.Context<unknown> | undefined
81
82 constructor(
83 initialChannel: Channel.Channel<OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>,
84 providedEnv: Context.Context<unknown> | undefined,
85 executeCloseLastSubstream: (effect: Effect.Effect<unknown, never, Env>) => Effect.Effect<unknown, never, Env>
86 ) {
87 this._currentChannel = initialChannel as core.Primitive
88 this._executeCloseLastSubstream = executeCloseLastSubstream
89 this._providedEnv = providedEnv
90 }
91
92 run(): ChannelState.ChannelState<unknown, Env> {
93 let result: ChannelState.ChannelState<unknown, Env> | undefined = undefined
94 while (result === undefined) {
95 if (this._cancelled !== undefined) {
96 result = this.processCancellation()
97 } else if (this._activeSubexecutor !== undefined) {
98 result = this.runSubexecutor()
99 } else {
100 try {
101 if (this._currentChannel === undefined) {
102 result = ChannelState.Done()
103 } else {
104 if (Effect.isEffect(this._currentChannel)) {
105 this._currentChannel = core.fromEffect(this._currentChannel) as core.Primitive
106 }

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected