MCPcopy
hub / github.com/Effect-TS/effect / make

Function make

packages/platform-browser/src/internal/workerRunner.ts:26–149  ·  view source on GitHub ↗
(self: MessagePort | Window)

Source from the content-addressed store, hash-verified

24
25/** @internal */
26export const make = (self: MessagePort | Window) =>
27 Runner.PlatformRunner.of({
28 [Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId,
29 start: Effect.fnUntraced(function*<I, O>(closeLatch: Deferred.Deferred<void, WorkerError>) {
30 const disconnects = yield* Mailbox.make<number>()
31 let currentPortId = 0
32
33 const ports = new Map<number, readonly [MessagePort, Scope.CloseableScope]>()
34 const send = (portId: number, message: O, transfer?: ReadonlyArray<unknown>) =>
35 Effect.sync(() => {
36 ;(ports.get(portId)?.[0] ?? self).postMessage([1, message], {
37 transfer: transfer as any
38 })
39 })
40
41 const run = Effect.fnUntraced(function*<A, E, R>(
42 handler: (portId: number, message: I) => Effect.Effect<A, E, R> | void
43 ) {
44 const scope = yield* Effect.scope
45 const runtime = (yield* Effect.interruptible(Effect.runtime<R | Scope.Scope>())).pipe(
46 Runtime.updateContext(Context.omit(Scope.Scope))
47 ) as Runtime.Runtime<R>
48 const fiberSet = yield* FiberSet.make<any, WorkerError | E>()
49 const runFork = Runtime.runFork(runtime)
50 function onExit(exit: Exit.Exit<any, E>) {
51 if (exit._tag === "Failure" && !Cause.isInterruptedOnly(exit.cause)) {
52 Deferred.unsafeDone(closeLatch, Exit.die(Cause.squash(exit.cause)))
53 }
54 }
55
56 function onMessage(portId: number) {
57 return function(event: MessageEvent) {
58 const message = event.data as Runner.BackingRunner.Message<I>
59 if (message[0] === 0) {
60 const result = handler(portId, message[1])
61 if (Effect.isEffect(result)) {
62 const fiber = runFork(result)
63 fiber.addObserver(onExit)
64 FiberSet.unsafeAdd(fiberSet, fiber)
65 }
66 } else {
67 const port = ports.get(portId)
68 if (!port) {
69 return
70 } else if (ports.size === 1) {
71 // let the last port close with the outer scope
72 return Deferred.unsafeDone(closeLatch, Exit.void)
73 }
74 ports.delete(portId)
75 Effect.runFork(Scope.close(port[1], Exit.void))
76 }
77 }
78 }
79 function onMessageError(error: MessageEvent) {
80 Deferred.unsafeDone(
81 closeLatch,
82 new WorkerError({ reason: "decode", cause: error.data })
83 )

Callers 2

layerMessagePortFunction · 0.70
workerRunner.tsFile · 0.70

Calls 13

handlePortFunction · 0.85
identityFunction · 0.85
runtimeMethod · 0.80
addEventListenerMethod · 0.80
clearMethod · 0.80
syncMethod · 0.80
removeEventListenerMethod · 0.80
ofMethod · 0.65
makeMethod · 0.65
pipeMethod · 0.65
omitMethod · 0.65
addFinalizerMethod · 0.65

Tested by

no test coverage detected