| 226 | |
| 227 | /** @internal */ |
| 228 | export const makePool = <I, O, E>( |
| 229 | options: Worker.WorkerPool.Options<I> |
| 230 | ) => |
| 231 | Effect.gen(function*() { |
| 232 | const manager = yield* WorkerManager |
| 233 | const workers = new Set<Worker.Worker<I, O, E>>() |
| 234 | const acquire = pipe( |
| 235 | manager.spawn<I, O, E>(options), |
| 236 | Effect.tap((worker) => |
| 237 | Effect.acquireRelease( |
| 238 | Effect.sync(() => workers.add(worker)), |
| 239 | () => Effect.sync(() => workers.delete(worker)) |
| 240 | ) |
| 241 | ), |
| 242 | options.onCreate ? Effect.tap(options.onCreate) : identity |
| 243 | ) |
| 244 | const backing = "minSize" in options ? |
| 245 | yield* Pool.makeWithTTL({ |
| 246 | acquire, |
| 247 | min: options.minSize, |
| 248 | max: options.maxSize, |
| 249 | concurrency: options.concurrency, |
| 250 | targetUtilization: options.targetUtilization, |
| 251 | timeToLive: options.timeToLive |
| 252 | }) : |
| 253 | yield* Pool.make({ |
| 254 | acquire, |
| 255 | size: options.size, |
| 256 | concurrency: options.concurrency, |
| 257 | targetUtilization: options.targetUtilization |
| 258 | }) |
| 259 | const pool: Worker.WorkerPool<I, O, E> = { |
| 260 | backing, |
| 261 | broadcast: (message: I) => |
| 262 | Effect.forEach(workers, (worker) => worker.executeEffect(message), { |
| 263 | concurrency: "unbounded", |
| 264 | discard: true |
| 265 | }), |
| 266 | execute: (message: I) => |
| 267 | Stream.unwrapScoped(Effect.map( |
| 268 | backing.get, |
| 269 | (worker) => worker.execute(message) |
| 270 | )), |
| 271 | executeEffect: (message: I) => |
| 272 | Effect.scoped(Effect.flatMap( |
| 273 | backing.get, |
| 274 | (worker) => worker.executeEffect(message) |
| 275 | )) |
| 276 | } |
| 277 | |
| 278 | // report any spawn errors |
| 279 | yield* Effect.scoped(backing.get) |
| 280 | |
| 281 | return pool |
| 282 | }) |
| 283 | |
| 284 | /** @internal */ |
| 285 | export const makePoolLayer = <Tag, I, O, E>( |