MCPcopy
hub / github.com/Effect-TS/effect / makePoolSerialized

Function makePoolSerialized

packages/platform/src/internal/worker.ts:332–381  ·  view source on GitHub ↗
(
  options: Worker.SerializedWorkerPool.Options<I>
)

Source from the content-addressed store, hash-verified

330
331/** @internal */
332export const makePoolSerialized = <I extends Schema.TaggedRequest.All>(
333 options: Worker.SerializedWorkerPool.Options<I>
334) =>
335 Effect.gen(function*() {
336 const manager = yield* WorkerManager
337 const workers = new Set<Worker.SerializedWorker<I>>()
338 const acquire = pipe(
339 makeSerialized<I>(options),
340 Effect.tap((worker) => Effect.sync(() => workers.add(worker))),
341 Effect.tap((worker) => Effect.addFinalizer(() => Effect.sync(() => workers.delete(worker)))),
342 options.onCreate
343 ? Effect.tap(
344 options.onCreate as (worker: Worker.SerializedWorker<I>) => Effect.Effect<void, WorkerError>
345 )
346 : identity,
347 Effect.provideService(WorkerManager, manager)
348 )
349 const backing = yield* "timeToLive" in options ?
350 Pool.makeWithTTL({
351 acquire,
352 min: options.minSize,
353 max: options.maxSize,
354 concurrency: options.concurrency,
355 targetUtilization: options.targetUtilization,
356 timeToLive: options.timeToLive
357 }) :
358 Pool.make({
359 acquire,
360 size: options.size,
361 concurrency: options.concurrency,
362 targetUtilization: options.targetUtilization
363 })
364 const pool: Worker.SerializedWorkerPool<I> = {
365 backing,
366 broadcast: <Req extends I>(message: Req) =>
367 Effect.forEach(workers, (worker) => worker.executeEffect(message), {
368 concurrency: "unbounded",
369 discard: true
370 }) as any,
371 execute: <Req extends I>(message: Req) =>
372 Stream.unwrapScoped(Effect.map(backing.get, (worker) => worker.execute(message))) as any,
373 executeEffect: <Req extends I>(message: Req) =>
374 Effect.scoped(Effect.flatMap(backing.get, (worker) => worker.executeEffect(message))) as any
375 }
376
377 // report any spawn errors
378 yield* Effect.scoped(backing.get)
379
380 return pool
381 })
382
383/** @internal */
384export const makePoolSerializedLayer = <Tag, I extends Schema.TaggedRequest.All>(

Callers 1

makePoolSerializedLayerFunction · 0.85

Calls 8

syncMethod · 0.80
pipeFunction · 0.70
makeSerializedFunction · 0.70
addMethod · 0.65
addFinalizerMethod · 0.65
makeMethod · 0.65
mapMethod · 0.65
executeMethod · 0.45

Tested by

no test coverage detected