| 159 | |
| 160 | /** @internal */ |
| 161 | export const makeUpgradeHandler = <R, E>( |
| 162 | lazyWss: Effect.Effect<WS.WebSocketServer>, |
| 163 | httpApp: App.Default<E, R>, |
| 164 | middleware?: Middleware.HttpMiddleware |
| 165 | ) => { |
| 166 | const handledApp = App.toHandled(httpApp, handleResponse, middleware) |
| 167 | return Effect.map(FiberSet.makeRuntime<R>(), (runFork) => |
| 168 | function handler( |
| 169 | nodeRequest: Http.IncomingMessage, |
| 170 | socket: Duplex, |
| 171 | head: Buffer |
| 172 | ) { |
| 173 | let nodeResponse_: Http.ServerResponse | undefined = undefined |
| 174 | const nodeResponse = () => { |
| 175 | if (nodeResponse_ === undefined) { |
| 176 | nodeResponse_ = new Http.ServerResponse(nodeRequest) |
| 177 | nodeResponse_.assignSocket(socket as any) |
| 178 | nodeResponse_.on("finish", () => { |
| 179 | socket.end() |
| 180 | }) |
| 181 | } |
| 182 | return nodeResponse_ |
| 183 | } |
| 184 | const upgradeEffect = Socket.fromWebSocket(Effect.flatMap( |
| 185 | lazyWss, |
| 186 | (wss) => |
| 187 | Effect.acquireRelease( |
| 188 | Effect.async<globalThis.WebSocket>((resume) => |
| 189 | wss.handleUpgrade(nodeRequest, socket, head, (ws) => { |
| 190 | resume(Effect.succeed(ws as any)) |
| 191 | }) |
| 192 | ), |
| 193 | (ws) => Effect.sync(() => ws.close()) |
| 194 | ) |
| 195 | )) |
| 196 | const fiber = runFork( |
| 197 | Effect.provideService( |
| 198 | handledApp, |
| 199 | ServerRequest.HttpServerRequest, |
| 200 | new ServerRequestImpl(nodeRequest, nodeResponse, upgradeEffect) |
| 201 | ) |
| 202 | ) |
| 203 | socket.on("close", () => { |
| 204 | if (!socket.writableEnded) { |
| 205 | fiber.unsafeInterruptAsFork(Error.clientAbortFiberId) |
| 206 | } |
| 207 | }) |
| 208 | }) |
| 209 | } |
| 210 | |
| 211 | class ServerRequestImpl extends HttpIncomingMessageImpl<Error.RequestError> implements ServerRequest.HttpServerRequest { |
| 212 | readonly [ServerRequest.TypeId]: ServerRequest.TypeId |