MCPcopy
hub / github.com/Effect-TS/effect / strategyUsageTTL

Function strategyUsageTTL

packages/effect/src/internal/pool.ts:386–423  ·  view source on GitHub ↗
(ttl: Duration.DurationInput)

Source from the content-addressed store, hash-verified

384 )
385
386const strategyUsageTTL = <A, E>(ttl: Duration.DurationInput) =>
387 core.map(internalQueue.unbounded<PoolItem<A, E>>(), (queue) => {
388 return identity<Strategy<A, E>>({
389 run: (pool) => {
390 const process: Effect<void> = core.suspend(() => {
391 const excess = pool.activeSize - pool.targetSize
392 if (excess <= 0) return core.void
393 return queue.take.pipe(
394 core.tap((item) => pool.invalidatePoolItem(item)),
395 core.zipRight(process)
396 )
397 })
398 return process.pipe(
399 coreEffect.delay(ttl),
400 coreEffect.forever
401 )
402 },
403 onAcquire: (item) => queue.offer(item),
404 reclaim(pool) {
405 return core.suspend((): Effect<Option.Option<PoolItem<A, E>>> => {
406 if (pool.invalidated.size === 0) {
407 return coreEffect.succeedNone
408 }
409 const item = Iterable.head(
410 Iterable.filter(pool.invalidated, (item) => !item.disableReclaim)
411 )
412 if (item._tag === "None") {
413 return coreEffect.succeedNone
414 }
415 pool.invalidated.delete(item.value)
416 if (item.value.refCount < pool.concurrency) {
417 pool.available.add(item.value)
418 }
419 return core.as(queue.offer(item.value), item)
420 })
421 }
422 })
423 })
424
425const reportUnhandledError = <E>(cause: Cause<E>) =>
426 core.withFiberRuntime<void>((fiber) => {

Callers 1

makeWithTTLFunction · 0.85

Calls 5

identityFunction · 0.85
invalidatePoolItemMethod · 0.80
mapMethod · 0.65
pipeMethod · 0.65
offerMethod · 0.65

Tested by

no test coverage detected