MCPcopy
hub / github.com/Effect-TS/effect / fromDuplex

Function fromDuplex

packages/platform-node-shared/src/NodeSocket.ts:79–197  ·  view source on GitHub ↗
(
  open: Effect.Effect<Duplex, Socket.SocketError, RO>,
  options?: {
    readonly openTimeout?: Duration.DurationInput | undefined
  }
)

Source from the content-addressed store, hash-verified

77 * @category constructors
78 */
79export const fromDuplex = <RO>(
80 open: Effect.Effect<Duplex, Socket.SocketError, RO>,
81 options?: {
82 readonly openTimeout?: Duration.DurationInput | undefined
83 }
84): Effect.Effect<Socket.Socket, never, Exclude<RO, Scope.Scope>> =>
85 Effect.withFiberRuntime<Socket.Socket, never, Exclude<RO, Scope.Scope>>((fiber) => {
86 let currentSocket: Duplex | undefined
87 const latch = Effect.unsafeMakeLatch(false)
88 const openContext = fiber.currentContext as Context.Context<RO>
89 const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void, opts?: {
90 readonly onOpen?: Effect.Effect<void> | undefined
91 }) =>
92 Effect.scopedWith(Effect.fnUntraced(function*(scope) {
93 const fiberSet = yield* FiberSet.make<any, E | Socket.SocketError>().pipe(
94 Scope.extend(scope)
95 )
96
97 let conn: Duplex | undefined = undefined
98 yield* Scope.addFinalizer(
99 scope,
100 Effect.sync(() => {
101 if (!conn) return
102 conn.off("data", onData)
103 conn.off("end", onEnd)
104 conn.off("error", onError)
105 conn.off("close", onClose)
106 })
107 )
108
109 conn = yield* Scope.extend(open, scope).pipe(
110 options?.openTimeout ?
111 Effect.timeoutFail({
112 duration: options.openTimeout,
113 onTimeout: () =>
114 new Socket.SocketGenericError({ reason: "Open", cause: new Error("Connection timed out") })
115 }) :
116 identity
117 )
118 conn.on("end", onEnd)
119 conn.on("error", onError)
120 conn.on("close", onClose)
121
122 const run = yield* Effect.provideService(FiberSet.runtime(fiberSet)<R>(), NetSocket, conn as Net.Socket)
123 conn.on("data", onData)
124
125 currentSocket = conn
126 yield* latch.open
127 if (opts?.onOpen) yield* opts.onOpen
128
129 return yield* FiberSet.join(fiberSet)
130
131 function onData(chunk: Uint8Array) {
132 const result = handler(chunk)
133 if (Effect.isEffect(result)) {
134 run(result)
135 }
136 }

Callers 1

makeNetFunction · 0.70

Calls 3

syncMethod · 0.80
endMethod · 0.65
ofMethod · 0.65

Tested by

no test coverage detected