MCPcopy Index your code
hub / github.com/Effect-TS/effect / makeUpgradeHandler

Function makeUpgradeHandler

packages/platform-node/src/internal/httpServer.ts:161–209  ·  view source on GitHub ↗
(
  lazyWss: Effect.Effect<WS.WebSocketServer>,
  httpApp: App.Default<E, R>,
  middleware?: Middleware.HttpMiddleware
)

Source from the content-addressed store, hash-verified

159
160/** @internal */
161export const makeUpgradeHandler = <R, E>(
162 lazyWss: Effect.Effect<WS.WebSocketServer>,
163 httpApp: App.Default<E, R>,
164 middleware?: Middleware.HttpMiddleware
165) => {
166 const handledApp = App.toHandled(httpApp, handleResponse, middleware)
167 return Effect.map(FiberSet.makeRuntime<R>(), (runFork) =>
168 function handler(
169 nodeRequest: Http.IncomingMessage,
170 socket: Duplex,
171 head: Buffer
172 ) {
173 let nodeResponse_: Http.ServerResponse | undefined = undefined
174 const nodeResponse = () => {
175 if (nodeResponse_ === undefined) {
176 nodeResponse_ = new Http.ServerResponse(nodeRequest)
177 nodeResponse_.assignSocket(socket as any)
178 nodeResponse_.on("finish", () => {
179 socket.end()
180 })
181 }
182 return nodeResponse_
183 }
184 const upgradeEffect = Socket.fromWebSocket(Effect.flatMap(
185 lazyWss,
186 (wss) =>
187 Effect.acquireRelease(
188 Effect.async<globalThis.WebSocket>((resume) =>
189 wss.handleUpgrade(nodeRequest, socket, head, (ws) => {
190 resume(Effect.succeed(ws as any))
191 })
192 ),
193 (ws) => Effect.sync(() => ws.close())
194 )
195 ))
196 const fiber = runFork(
197 Effect.provideService(
198 handledApp,
199 ServerRequest.HttpServerRequest,
200 new ServerRequestImpl(nodeRequest, nodeResponse, upgradeEffect)
201 )
202 )
203 socket.on("close", () => {
204 if (!socket.writableEnded) {
205 fiber.unsafeInterruptAsFork(Error.clientAbortFiberId)
206 }
207 })
208 })
209}
210
211class ServerRequestImpl extends HttpIncomingMessageImpl<Error.RequestError> implements ServerRequest.HttpServerRequest {
212 readonly [ServerRequest.TypeId]: ServerRequest.TypeId

Callers 1

makeFunction · 0.85

Calls 6

syncMethod · 0.80
mapMethod · 0.65
closeMethod · 0.65
unsafeInterruptAsForkMethod · 0.65
resumeFunction · 0.50
runForkFunction · 0.50

Tested by

no test coverage detected