MCPcopy
hub / github.com/Effect-TS/effect / run

Function run

packages/platform-node-shared/src/NodeSocket.ts:89–164  ·  view source on GitHub ↗
(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void, opts?: {
      readonly onOpen?: Effect.Effect<void> | undefined
    })

Source from the content-addressed store, hash-verified

87 const latch = Effect.unsafeMakeLatch(false)
88 const openContext = fiber.currentContext as Context.Context<RO>
89 const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void, opts?: {
90 readonly onOpen?: Effect.Effect<void> | undefined
91 }) =>
92 Effect.scopedWith(Effect.fnUntraced(function*(scope) {
93 const fiberSet = yield* FiberSet.make<any, E | Socket.SocketError>().pipe(
94 Scope.extend(scope)
95 )
96
97 let conn: Duplex | undefined = undefined
98 yield* Scope.addFinalizer(
99 scope,
100 Effect.sync(() => {
101 if (!conn) return
102 conn.off("data", onData)
103 conn.off("end", onEnd)
104 conn.off("error", onError)
105 conn.off("close", onClose)
106 })
107 )
108
109 conn = yield* Scope.extend(open, scope).pipe(
110 options?.openTimeout ?
111 Effect.timeoutFail({
112 duration: options.openTimeout,
113 onTimeout: () =>
114 new Socket.SocketGenericError({ reason: "Open", cause: new Error("Connection timed out") })
115 }) :
116 identity
117 )
118 conn.on("end", onEnd)
119 conn.on("error", onError)
120 conn.on("close", onClose)
121
122 const run = yield* Effect.provideService(FiberSet.runtime(fiberSet)<R>(), NetSocket, conn as Net.Socket)
123 conn.on("data", onData)
124
125 currentSocket = conn
126 yield* latch.open
127 if (opts?.onOpen) yield* opts.onOpen
128
129 return yield* FiberSet.join(fiberSet)
130
131 function onData(chunk: Uint8Array) {
132 const result = handler(chunk)
133 if (Effect.isEffect(result)) {
134 run(result)
135 }
136 }
137 function onEnd() {
138 Deferred.unsafeDone(fiberSet.deferred, Effect.void)
139 }
140 function onError(cause: Error) {
141 Deferred.unsafeDone(
142 fiberSet.deferred,
143 Effect.fail(new Socket.SocketGenericError({ reason: "Read", cause }))
144 )
145 }
146 function onClose(hadError: boolean) {

Callers 2

onDataFunction · 0.70
runRawMethod · 0.50

Calls 8

syncMethod · 0.80
runtimeMethod · 0.80
mergeMethod · 0.80
unsafeCloseMethod · 0.80
pipeMethod · 0.65
makeMethod · 0.65
addFinalizerMethod · 0.65
joinMethod · 0.65

Tested by

no test coverage detected