(self: MessagePort | Window)
| 24 | |
| 25 | /** @internal */ |
| 26 | export const make = (self: MessagePort | Window) => |
| 27 | Runner.PlatformRunner.of({ |
| 28 | [Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId, |
| 29 | start: Effect.fnUntraced(function*<I, O>(closeLatch: Deferred.Deferred<void, WorkerError>) { |
| 30 | const disconnects = yield* Mailbox.make<number>() |
| 31 | let currentPortId = 0 |
| 32 | |
| 33 | const ports = new Map<number, readonly [MessagePort, Scope.CloseableScope]>() |
| 34 | const send = (portId: number, message: O, transfer?: ReadonlyArray<unknown>) => |
| 35 | Effect.sync(() => { |
| 36 | ;(ports.get(portId)?.[0] ?? self).postMessage([1, message], { |
| 37 | transfer: transfer as any |
| 38 | }) |
| 39 | }) |
| 40 | |
| 41 | const run = Effect.fnUntraced(function*<A, E, R>( |
| 42 | handler: (portId: number, message: I) => Effect.Effect<A, E, R> | void |
| 43 | ) { |
| 44 | const scope = yield* Effect.scope |
| 45 | const runtime = (yield* Effect.interruptible(Effect.runtime<R | Scope.Scope>())).pipe( |
| 46 | Runtime.updateContext(Context.omit(Scope.Scope)) |
| 47 | ) as Runtime.Runtime<R> |
| 48 | const fiberSet = yield* FiberSet.make<any, WorkerError | E>() |
| 49 | const runFork = Runtime.runFork(runtime) |
| 50 | function onExit(exit: Exit.Exit<any, E>) { |
| 51 | if (exit._tag === "Failure" && !Cause.isInterruptedOnly(exit.cause)) { |
| 52 | Deferred.unsafeDone(closeLatch, Exit.die(Cause.squash(exit.cause))) |
| 53 | } |
| 54 | } |
| 55 | |
| 56 | function onMessage(portId: number) { |
| 57 | return function(event: MessageEvent) { |
| 58 | const message = event.data as Runner.BackingRunner.Message<I> |
| 59 | if (message[0] === 0) { |
| 60 | const result = handler(portId, message[1]) |
| 61 | if (Effect.isEffect(result)) { |
| 62 | const fiber = runFork(result) |
| 63 | fiber.addObserver(onExit) |
| 64 | FiberSet.unsafeAdd(fiberSet, fiber) |
| 65 | } |
| 66 | } else { |
| 67 | const port = ports.get(portId) |
| 68 | if (!port) { |
| 69 | return |
| 70 | } else if (ports.size === 1) { |
| 71 | // let the last port close with the outer scope |
| 72 | return Deferred.unsafeDone(closeLatch, Exit.void) |
| 73 | } |
| 74 | ports.delete(portId) |
| 75 | Effect.runFork(Scope.close(port[1], Exit.void)) |
| 76 | } |
| 77 | } |
| 78 | } |
| 79 | function onMessageError(error: MessageEvent) { |
| 80 | Deferred.unsafeDone( |
| 81 | closeLatch, |
| 82 | new WorkerError({ reason: "decode", cause: error.data }) |
| 83 | ) |
no test coverage detected