MCPcopy
hub / github.com/Effect-TS/effect / runRaw

Function runRaw

packages/platform/src/Socket.ts:408–489  ·  view source on GitHub ↗
(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void, opts?: {
      readonly onOpen?: Effect.Effect<void> | undefined
    })

Source from the content-addressed store, hash-verified

406 const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError
407
408 const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void, opts?: {
409 readonly onOpen?: Effect.Effect<void> | undefined
410 }) =>
411 Effect.scopedWith(Effect.fnUntraced(function*(scope) {
412 const fiberSet = yield* FiberSet.make<any, E | SocketError>().pipe(
413 Scope.extend(scope)
414 )
415 const ws = yield* Scope.extend(acquire, scope)
416 const run = yield* Effect.provideService(FiberSet.runtime(fiberSet)<R>(), WebSocket, ws)
417 let open = false
418
419 function onMessage(event: MessageEvent) {
420 if (event.data instanceof Blob) {
421 return Effect.promise(() => event.data.arrayBuffer() as Promise<ArrayBuffer>).pipe(
422 Effect.andThen((buffer) => handler(new Uint8Array(buffer))),
423 run
424 )
425 }
426 const result = handler(event.data)
427 if (Effect.isEffect(result)) {
428 run(result)
429 }
430 }
431 function onError(cause: Event) {
432 ws.removeEventListener("message", onMessage)
433 ws.removeEventListener("close", onClose)
434 Deferred.unsafeDone(
435 fiberSet.deferred,
436 Effect.fail(new SocketGenericError({ reason: open ? "Read" : "Open", cause }))
437 )
438 }
439 function onClose(event: globalThis.CloseEvent) {
440 ws.removeEventListener("message", onMessage)
441 ws.removeEventListener("error", onError)
442 Deferred.unsafeDone(
443 fiberSet.deferred,
444 Effect.fail(
445 new SocketCloseError({
446 reason: "Close",
447 code: event.code,
448 closeReason: event.reason
449 })
450 )
451 )
452 }
453
454 ws.addEventListener("close", onClose, { once: true })
455 ws.addEventListener("error", onError, { once: true })
456 ws.addEventListener("message", onMessage)
457
458 if (ws.readyState !== 1) {
459 const openDeferred = Deferred.unsafeMake<void>(fiber.id())
460 ws.addEventListener("open", () => {
461 open = true
462 Deferred.unsafeDone(openDeferred, Effect.void)
463 }, { once: true })
464 yield* Deferred.await(openDeferred).pipe(
465 Effect.timeoutFail({

Callers 2

runFunction · 0.70
runMethod · 0.50

Calls 15

isSocketErrorFunction · 0.85
runtimeMethod · 0.80
addEventListenerMethod · 0.80
unsafeDoneMethod · 0.80
isCleanMethod · 0.80
mergeMethod · 0.80
syncMethod · 0.80
unsafeCloseMethod · 0.80
readMethod · 0.80
handlerFunction · 0.70
pipeMethod · 0.65
makeMethod · 0.65

Tested by

no test coverage detected