(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void, opts?: {
readonly onOpen?: Effect.Effect<void> | undefined
})
| 87 | const latch = Effect.unsafeMakeLatch(false) |
| 88 | const openContext = fiber.currentContext as Context.Context<RO> |
| 89 | const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void, opts?: { |
| 90 | readonly onOpen?: Effect.Effect<void> | undefined |
| 91 | }) => |
| 92 | Effect.scopedWith(Effect.fnUntraced(function*(scope) { |
| 93 | const fiberSet = yield* FiberSet.make<any, E | Socket.SocketError>().pipe( |
| 94 | Scope.extend(scope) |
| 95 | ) |
| 96 | |
| 97 | let conn: Duplex | undefined = undefined |
| 98 | yield* Scope.addFinalizer( |
| 99 | scope, |
| 100 | Effect.sync(() => { |
| 101 | if (!conn) return |
| 102 | conn.off("data", onData) |
| 103 | conn.off("end", onEnd) |
| 104 | conn.off("error", onError) |
| 105 | conn.off("close", onClose) |
| 106 | }) |
| 107 | ) |
| 108 | |
| 109 | conn = yield* Scope.extend(open, scope).pipe( |
| 110 | options?.openTimeout ? |
| 111 | Effect.timeoutFail({ |
| 112 | duration: options.openTimeout, |
| 113 | onTimeout: () => |
| 114 | new Socket.SocketGenericError({ reason: "Open", cause: new Error("Connection timed out") }) |
| 115 | }) : |
| 116 | identity |
| 117 | ) |
| 118 | conn.on("end", onEnd) |
| 119 | conn.on("error", onError) |
| 120 | conn.on("close", onClose) |
| 121 | |
| 122 | const run = yield* Effect.provideService(FiberSet.runtime(fiberSet)<R>(), NetSocket, conn as Net.Socket) |
| 123 | conn.on("data", onData) |
| 124 | |
| 125 | currentSocket = conn |
| 126 | yield* latch.open |
| 127 | if (opts?.onOpen) yield* opts.onOpen |
| 128 | |
| 129 | return yield* FiberSet.join(fiberSet) |
| 130 | |
| 131 | function onData(chunk: Uint8Array) { |
| 132 | const result = handler(chunk) |
| 133 | if (Effect.isEffect(result)) { |
| 134 | run(result) |
| 135 | } |
| 136 | } |
| 137 | function onEnd() { |
| 138 | Deferred.unsafeDone(fiberSet.deferred, Effect.void) |
| 139 | } |
| 140 | function onError(cause: Error) { |
| 141 | Deferred.unsafeDone( |
| 142 | fiberSet.deferred, |
| 143 | Effect.fail(new Socket.SocketGenericError({ reason: "Read", cause })) |
| 144 | ) |
| 145 | } |
| 146 | function onClose(hadError: boolean) { |
no test coverage detected