| 393 | * @category constructors |
| 394 | */ |
| 395 | export const fromWebSocket = <RO>( |
| 396 | acquire: Effect.Effect<globalThis.WebSocket, SocketError, RO>, |
| 397 | options?: { |
| 398 | readonly closeCodeIsError?: (code: number) => boolean |
| 399 | readonly openTimeout?: DurationInput |
| 400 | } |
| 401 | ): Effect.Effect<Socket, never, Exclude<RO, Scope.Scope>> => |
| 402 | Effect.withFiberRuntime((fiber) => { |
| 403 | let currentWS: globalThis.WebSocket | undefined |
| 404 | const latch = Effect.unsafeMakeLatch(false) |
| 405 | const acquireContext = fiber.currentContext as Context.Context<RO> |
| 406 | const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError |
| 407 | |
| 408 | const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void, opts?: { |
| 409 | readonly onOpen?: Effect.Effect<void> | undefined |
| 410 | }) => |
| 411 | Effect.scopedWith(Effect.fnUntraced(function*(scope) { |
| 412 | const fiberSet = yield* FiberSet.make<any, E | SocketError>().pipe( |
| 413 | Scope.extend(scope) |
| 414 | ) |
| 415 | const ws = yield* Scope.extend(acquire, scope) |
| 416 | const run = yield* Effect.provideService(FiberSet.runtime(fiberSet)<R>(), WebSocket, ws) |
| 417 | let open = false |
| 418 | |
| 419 | function onMessage(event: MessageEvent) { |
| 420 | if (event.data instanceof Blob) { |
| 421 | return Effect.promise(() => event.data.arrayBuffer() as Promise<ArrayBuffer>).pipe( |
| 422 | Effect.andThen((buffer) => handler(new Uint8Array(buffer))), |
| 423 | run |
| 424 | ) |
| 425 | } |
| 426 | const result = handler(event.data) |
| 427 | if (Effect.isEffect(result)) { |
| 428 | run(result) |
| 429 | } |
| 430 | } |
| 431 | function onError(cause: Event) { |
| 432 | ws.removeEventListener("message", onMessage) |
| 433 | ws.removeEventListener("close", onClose) |
| 434 | Deferred.unsafeDone( |
| 435 | fiberSet.deferred, |
| 436 | Effect.fail(new SocketGenericError({ reason: open ? "Read" : "Open", cause })) |
| 437 | ) |
| 438 | } |
| 439 | function onClose(event: globalThis.CloseEvent) { |
| 440 | ws.removeEventListener("message", onMessage) |
| 441 | ws.removeEventListener("error", onError) |
| 442 | Deferred.unsafeDone( |
| 443 | fiberSet.deferred, |
| 444 | Effect.fail( |
| 445 | new SocketCloseError({ |
| 446 | reason: "Close", |
| 447 | code: event.code, |
| 448 | closeReason: event.reason |
| 449 | }) |
| 450 | ) |
| 451 | ) |
| 452 | } |