MCPcopy Index your code
hub / github.com/Effect-TS/effect / makePool

Function makePool

packages/platform/src/internal/worker.ts:228–282  ·  view source on GitHub ↗
(
  options: Worker.WorkerPool.Options<I>
)

Source from the content-addressed store, hash-verified

226
227/** @internal */
228export const makePool = <I, O, E>(
229 options: Worker.WorkerPool.Options<I>
230) =>
231 Effect.gen(function*() {
232 const manager = yield* WorkerManager
233 const workers = new Set<Worker.Worker<I, O, E>>()
234 const acquire = pipe(
235 manager.spawn<I, O, E>(options),
236 Effect.tap((worker) =>
237 Effect.acquireRelease(
238 Effect.sync(() => workers.add(worker)),
239 () => Effect.sync(() => workers.delete(worker))
240 )
241 ),
242 options.onCreate ? Effect.tap(options.onCreate) : identity
243 )
244 const backing = "minSize" in options ?
245 yield* Pool.makeWithTTL({
246 acquire,
247 min: options.minSize,
248 max: options.maxSize,
249 concurrency: options.concurrency,
250 targetUtilization: options.targetUtilization,
251 timeToLive: options.timeToLive
252 }) :
253 yield* Pool.make({
254 acquire,
255 size: options.size,
256 concurrency: options.concurrency,
257 targetUtilization: options.targetUtilization
258 })
259 const pool: Worker.WorkerPool<I, O, E> = {
260 backing,
261 broadcast: (message: I) =>
262 Effect.forEach(workers, (worker) => worker.executeEffect(message), {
263 concurrency: "unbounded",
264 discard: true
265 }),
266 execute: (message: I) =>
267 Stream.unwrapScoped(Effect.map(
268 backing.get,
269 (worker) => worker.execute(message)
270 )),
271 executeEffect: (message: I) =>
272 Effect.scoped(Effect.flatMap(
273 backing.get,
274 (worker) => worker.executeEffect(message)
275 ))
276 }
277
278 // report any spawn errors
279 yield* Effect.scoped(backing.get)
280
281 return pool
282 })
283
284/** @internal */
285export const makePoolLayer = <Tag, I, O, E>(

Callers 1

makePoolLayerFunction · 0.85

Calls 6

syncMethod · 0.80
pipeFunction · 0.70
addMethod · 0.65
makeMethod · 0.65
mapMethod · 0.65
executeMethod · 0.45

Tested by

no test coverage detected