MCPcopy Index your code
hub / github.com/Effect-TS/effect / BackPressureStrategy

Class BackPressureStrategy

packages/effect/src/internal/pubsub.ts:1399–1509  ·  view source on GitHub ↗

* A strategy that applies back pressure to publishers when the `PubSub` is at * capacity. This guarantees that all subscribers will receive all messages * published to the `PubSub` while they are subscribed. However, it creates the * risk that a slow subscriber will slow down the rate at which me

Source from the content-addressed store, hash-verified

1397 * @internal
1398 */
1399class BackPressureStrategy<in out A> implements PubSubStrategy<A> {
1400 publishers: MutableQueue.MutableQueue<
1401 readonly [A, Deferred.Deferred<boolean>, boolean]
1402 > = MutableQueue.unbounded()
1403
1404 get shutdown(): Effect.Effect<void> {
1405 return core.flatMap(core.fiberId, (fiberId) =>
1406 core.flatMap(
1407 core.sync(() => unsafePollAllQueue(this.publishers)),
1408 (publishers) =>
1409 fiberRuntime.forEachConcurrentDiscard(
1410 publishers,
1411 ([_, deferred, last]) =>
1412 last ?
1413 pipe(core.deferredInterruptWith(deferred, fiberId), core.asVoid) :
1414 core.void,
1415 false,
1416 false
1417 )
1418 ))
1419 }
1420
1421 handleSurplus(
1422 pubsub: AtomicPubSub<A>,
1423 subscribers: Subscribers<A>,
1424 elements: Iterable<A>,
1425 isShutdown: MutableRef.MutableRef<boolean>
1426 ): Effect.Effect<boolean> {
1427 return core.withFiberRuntime((state) => {
1428 const deferred = core.deferredUnsafeMake<boolean>(state.id())
1429 return pipe(
1430 core.suspend(() => {
1431 this.unsafeOffer(elements, deferred)
1432 this.unsafeOnPubSubEmptySpace(pubsub, subscribers)
1433 this.unsafeCompleteSubscribers(pubsub, subscribers)
1434 return MutableRef.get(isShutdown) ?
1435 core.interrupt :
1436 core.deferredAwait(deferred)
1437 }),
1438 core.onInterrupt(() => core.sync(() => this.unsafeRemove(deferred)))
1439 )
1440 })
1441 }
1442
1443 unsafeOnPubSubEmptySpace(
1444 pubsub: AtomicPubSub<A>,
1445 subscribers: Subscribers<A>
1446 ): void {
1447 let keepPolling = true
1448 while (keepPolling && !pubsub.isFull()) {
1449 const publisher = pipe(this.publishers, MutableQueue.poll(MutableQueue.EmptyMutableQueue))
1450 if (publisher === MutableQueue.EmptyMutableQueue) {
1451 keepPolling = false
1452 } else {
1453 const published = pubsub.publish(publisher[0])
1454 if (published && publisher[2]) {
1455 unsafeCompleteDeferred(publisher[1], true)
1456 } else if (!published) {

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected