MCPcopy
hub / github.com/Effect-TS/effect / start

Function start

packages/platform-node/src/internal/workerRunner.ts:16–73  ·  view source on GitHub ↗
(closeLatch: Deferred.Deferred<void, WorkerError>)

Source from the content-addressed store, hash-verified

14const platformRunnerImpl = Runner.PlatformRunner.of({
15 [Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId,
16 start<I, O>(closeLatch: Deferred.Deferred<void, WorkerError>) {
17 return Effect.gen(function*() {
18 if (!WorkerThreads.parentPort && !process.send) {
19 return yield* new WorkerError({ reason: "spawn", cause: new Error("not in a worker") })
20 }
21
22 const unsafeSend = WorkerThreads.parentPort
23 ? (message: any, transfers?: any) => WorkerThreads.parentPort!.postMessage(message, transfers)
24 : (message: any, _transfers?: any) => process.send!(message)
25 const send = (_portId: number, message: O, transfers?: ReadonlyArray<unknown>) =>
26 Effect.sync(() => unsafeSend([1, message], transfers as any))
27
28 const run = Effect.fnUntraced(function*<A, E, R>(
29 handler: (portId: number, message: I) => Effect.Effect<A, E, R> | void
30 ) {
31 const runtime = (yield* Effect.interruptible(Effect.runtime<R | Scope.Scope>())).pipe(
32 Runtime.updateContext(Context.omit(Scope.Scope))
33 ) as Runtime.Runtime<R>
34 const fiberSet = yield* FiberSet.make<any, WorkerError | E>()
35 const runFork = Runtime.runFork(runtime)
36 const onExit = (exit: Exit.Exit<any, E>) => {
37 if (exit._tag === "Failure" && !Cause.isInterruptedOnly(exit.cause)) {
38 Deferred.unsafeDone(closeLatch, Exit.die(Cause.squash(exit.cause)))
39 }
40 }
41 ;(WorkerThreads.parentPort ?? process).on("message", (message: Runner.BackingRunner.Message<I>) => {
42 if (message[0] === 0) {
43 const result = handler(0, message[1])
44 if (Effect.isEffect(result)) {
45 const fiber = runFork(result)
46 fiber.addObserver(onExit)
47 FiberSet.unsafeAdd(fiberSet, fiber)
48 }
49 } else {
50 if (WorkerThreads.parentPort) {
51 WorkerThreads.parentPort.close()
52 } else {
53 process.channel?.unref()
54 }
55 Deferred.unsafeDone(closeLatch, Exit.void)
56 }
57 })
58
59 if (WorkerThreads.parentPort) {
60 WorkerThreads.parentPort.on("messageerror", (cause) => {
61 Deferred.unsafeDone(closeLatch, new WorkerError({ reason: "decode", cause }))
62 })
63 WorkerThreads.parentPort.on("error", (cause) => {
64 Deferred.unsafeDone(closeLatch, new WorkerError({ reason: "unknown", cause }))
65 })
66 }
67
68 unsafeSend([0])
69 })
70
71 return { run, send }
72 })
73 }

Callers

nothing calls this directly

Calls 9

runtimeMethod · 0.80
unsafeDoneMethod · 0.80
pipeMethod · 0.65
omitMethod · 0.65
makeMethod · 0.65
addObserverMethod · 0.65
closeMethod · 0.65
handlerFunction · 0.50
runForkFunction · 0.50

Tested by

no test coverage detected