| 77 | * @category constructors |
| 78 | */ |
| 79 | export const fromDuplex = <RO>( |
| 80 | open: Effect.Effect<Duplex, Socket.SocketError, RO>, |
| 81 | options?: { |
| 82 | readonly openTimeout?: Duration.DurationInput | undefined |
| 83 | } |
| 84 | ): Effect.Effect<Socket.Socket, never, Exclude<RO, Scope.Scope>> => |
| 85 | Effect.withFiberRuntime<Socket.Socket, never, Exclude<RO, Scope.Scope>>((fiber) => { |
| 86 | let currentSocket: Duplex | undefined |
| 87 | const latch = Effect.unsafeMakeLatch(false) |
| 88 | const openContext = fiber.currentContext as Context.Context<RO> |
| 89 | const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void, opts?: { |
| 90 | readonly onOpen?: Effect.Effect<void> | undefined |
| 91 | }) => |
| 92 | Effect.scopedWith(Effect.fnUntraced(function*(scope) { |
| 93 | const fiberSet = yield* FiberSet.make<any, E | Socket.SocketError>().pipe( |
| 94 | Scope.extend(scope) |
| 95 | ) |
| 96 | |
| 97 | let conn: Duplex | undefined = undefined |
| 98 | yield* Scope.addFinalizer( |
| 99 | scope, |
| 100 | Effect.sync(() => { |
| 101 | if (!conn) return |
| 102 | conn.off("data", onData) |
| 103 | conn.off("end", onEnd) |
| 104 | conn.off("error", onError) |
| 105 | conn.off("close", onClose) |
| 106 | }) |
| 107 | ) |
| 108 | |
| 109 | conn = yield* Scope.extend(open, scope).pipe( |
| 110 | options?.openTimeout ? |
| 111 | Effect.timeoutFail({ |
| 112 | duration: options.openTimeout, |
| 113 | onTimeout: () => |
| 114 | new Socket.SocketGenericError({ reason: "Open", cause: new Error("Connection timed out") }) |
| 115 | }) : |
| 116 | identity |
| 117 | ) |
| 118 | conn.on("end", onEnd) |
| 119 | conn.on("error", onError) |
| 120 | conn.on("close", onClose) |
| 121 | |
| 122 | const run = yield* Effect.provideService(FiberSet.runtime(fiberSet)<R>(), NetSocket, conn as Net.Socket) |
| 123 | conn.on("data", onData) |
| 124 | |
| 125 | currentSocket = conn |
| 126 | yield* latch.open |
| 127 | if (opts?.onOpen) yield* opts.onOpen |
| 128 | |
| 129 | return yield* FiberSet.join(fiberSet) |
| 130 | |
| 131 | function onData(chunk: Uint8Array) { |
| 132 | const result = handler(chunk) |
| 133 | if (Effect.isEffect(result)) { |
| 134 | run(result) |
| 135 | } |
| 136 | } |