MCPcopy Index your code
hub / github.com/Effect-TS/effect / fromWebSocket

Function fromWebSocket

packages/platform/src/Socket.ts:395–519  ·  view source on GitHub ↗
(
  acquire: Effect.Effect<globalThis.WebSocket, SocketError, RO>,
  options?: {
    readonly closeCodeIsError?: (code: number) => boolean
    readonly openTimeout?: DurationInput
  }
)

Source from the content-addressed store, hash-verified

393 * @category constructors
394 */
395export const fromWebSocket = <RO>(
396 acquire: Effect.Effect<globalThis.WebSocket, SocketError, RO>,
397 options?: {
398 readonly closeCodeIsError?: (code: number) => boolean
399 readonly openTimeout?: DurationInput
400 }
401): Effect.Effect<Socket, never, Exclude<RO, Scope.Scope>> =>
402 Effect.withFiberRuntime((fiber) => {
403 let currentWS: globalThis.WebSocket | undefined
404 const latch = Effect.unsafeMakeLatch(false)
405 const acquireContext = fiber.currentContext as Context.Context<RO>
406 const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError
407
408 const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void, opts?: {
409 readonly onOpen?: Effect.Effect<void> | undefined
410 }) =>
411 Effect.scopedWith(Effect.fnUntraced(function*(scope) {
412 const fiberSet = yield* FiberSet.make<any, E | SocketError>().pipe(
413 Scope.extend(scope)
414 )
415 const ws = yield* Scope.extend(acquire, scope)
416 const run = yield* Effect.provideService(FiberSet.runtime(fiberSet)<R>(), WebSocket, ws)
417 let open = false
418
419 function onMessage(event: MessageEvent) {
420 if (event.data instanceof Blob) {
421 return Effect.promise(() => event.data.arrayBuffer() as Promise<ArrayBuffer>).pipe(
422 Effect.andThen((buffer) => handler(new Uint8Array(buffer))),
423 run
424 )
425 }
426 const result = handler(event.data)
427 if (Effect.isEffect(result)) {
428 run(result)
429 }
430 }
431 function onError(cause: Event) {
432 ws.removeEventListener("message", onMessage)
433 ws.removeEventListener("close", onClose)
434 Deferred.unsafeDone(
435 fiberSet.deferred,
436 Effect.fail(new SocketGenericError({ reason: open ? "Read" : "Open", cause }))
437 )
438 }
439 function onClose(event: globalThis.CloseEvent) {
440 ws.removeEventListener("message", onMessage)
441 ws.removeEventListener("error", onError)
442 Deferred.unsafeDone(
443 fiberSet.deferred,
444 Effect.fail(
445 new SocketCloseError({
446 reason: "Close",
447 code: event.code,
448 closeReason: event.reason
449 })
450 )
451 )
452 }

Callers 1

makeWebSocketFunction · 0.70

Calls 1

ofMethod · 0.65

Tested by

no test coverage detected