(closeLatch: Deferred.Deferred<void, WorkerError>)
| 14 | const platformRunnerImpl = Runner.PlatformRunner.of({ |
| 15 | [Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId, |
| 16 | start<I, O>(closeLatch: Deferred.Deferred<void, WorkerError>) { |
| 17 | return Effect.gen(function*() { |
| 18 | if (!WorkerThreads.parentPort && !process.send) { |
| 19 | return yield* new WorkerError({ reason: "spawn", cause: new Error("not in a worker") }) |
| 20 | } |
| 21 | |
| 22 | const unsafeSend = WorkerThreads.parentPort |
| 23 | ? (message: any, transfers?: any) => WorkerThreads.parentPort!.postMessage(message, transfers) |
| 24 | : (message: any, _transfers?: any) => process.send!(message) |
| 25 | const send = (_portId: number, message: O, transfers?: ReadonlyArray<unknown>) => |
| 26 | Effect.sync(() => unsafeSend([1, message], transfers as any)) |
| 27 | |
| 28 | const run = Effect.fnUntraced(function*<A, E, R>( |
| 29 | handler: (portId: number, message: I) => Effect.Effect<A, E, R> | void |
| 30 | ) { |
| 31 | const runtime = (yield* Effect.interruptible(Effect.runtime<R | Scope.Scope>())).pipe( |
| 32 | Runtime.updateContext(Context.omit(Scope.Scope)) |
| 33 | ) as Runtime.Runtime<R> |
| 34 | const fiberSet = yield* FiberSet.make<any, WorkerError | E>() |
| 35 | const runFork = Runtime.runFork(runtime) |
| 36 | const onExit = (exit: Exit.Exit<any, E>) => { |
| 37 | if (exit._tag === "Failure" && !Cause.isInterruptedOnly(exit.cause)) { |
| 38 | Deferred.unsafeDone(closeLatch, Exit.die(Cause.squash(exit.cause))) |
| 39 | } |
| 40 | } |
| 41 | ;(WorkerThreads.parentPort ?? process).on("message", (message: Runner.BackingRunner.Message<I>) => { |
| 42 | if (message[0] === 0) { |
| 43 | const result = handler(0, message[1]) |
| 44 | if (Effect.isEffect(result)) { |
| 45 | const fiber = runFork(result) |
| 46 | fiber.addObserver(onExit) |
| 47 | FiberSet.unsafeAdd(fiberSet, fiber) |
| 48 | } |
| 49 | } else { |
| 50 | if (WorkerThreads.parentPort) { |
| 51 | WorkerThreads.parentPort.close() |
| 52 | } else { |
| 53 | process.channel?.unref() |
| 54 | } |
| 55 | Deferred.unsafeDone(closeLatch, Exit.void) |
| 56 | } |
| 57 | }) |
| 58 | |
| 59 | if (WorkerThreads.parentPort) { |
| 60 | WorkerThreads.parentPort.on("messageerror", (cause) => { |
| 61 | Deferred.unsafeDone(closeLatch, new WorkerError({ reason: "decode", cause })) |
| 62 | }) |
| 63 | WorkerThreads.parentPort.on("error", (cause) => { |
| 64 | Deferred.unsafeDone(closeLatch, new WorkerError({ reason: "unknown", cause })) |
| 65 | }) |
| 66 | } |
| 67 | |
| 68 | unsafeSend([0]) |
| 69 | }) |
| 70 | |
| 71 | return { run, send } |
| 72 | }) |
| 73 | } |
nothing calls this directly
no test coverage detected