* A strategy that applies back pressure to publishers when the `PubSub` is at * capacity. This guarantees that all subscribers will receive all messages * published to the `PubSub` while they are subscribed. However, it creates the * risk that a slow subscriber will slow down the rate at which me
| 1397 | * @internal |
| 1398 | */ |
| 1399 | class BackPressureStrategy<in out A> implements PubSubStrategy<A> { |
| 1400 | publishers: MutableQueue.MutableQueue< |
| 1401 | readonly [A, Deferred.Deferred<boolean>, boolean] |
| 1402 | > = MutableQueue.unbounded() |
| 1403 | |
| 1404 | get shutdown(): Effect.Effect<void> { |
| 1405 | return core.flatMap(core.fiberId, (fiberId) => |
| 1406 | core.flatMap( |
| 1407 | core.sync(() => unsafePollAllQueue(this.publishers)), |
| 1408 | (publishers) => |
| 1409 | fiberRuntime.forEachConcurrentDiscard( |
| 1410 | publishers, |
| 1411 | ([_, deferred, last]) => |
| 1412 | last ? |
| 1413 | pipe(core.deferredInterruptWith(deferred, fiberId), core.asVoid) : |
| 1414 | core.void, |
| 1415 | false, |
| 1416 | false |
| 1417 | ) |
| 1418 | )) |
| 1419 | } |
| 1420 | |
| 1421 | handleSurplus( |
| 1422 | pubsub: AtomicPubSub<A>, |
| 1423 | subscribers: Subscribers<A>, |
| 1424 | elements: Iterable<A>, |
| 1425 | isShutdown: MutableRef.MutableRef<boolean> |
| 1426 | ): Effect.Effect<boolean> { |
| 1427 | return core.withFiberRuntime((state) => { |
| 1428 | const deferred = core.deferredUnsafeMake<boolean>(state.id()) |
| 1429 | return pipe( |
| 1430 | core.suspend(() => { |
| 1431 | this.unsafeOffer(elements, deferred) |
| 1432 | this.unsafeOnPubSubEmptySpace(pubsub, subscribers) |
| 1433 | this.unsafeCompleteSubscribers(pubsub, subscribers) |
| 1434 | return MutableRef.get(isShutdown) ? |
| 1435 | core.interrupt : |
| 1436 | core.deferredAwait(deferred) |
| 1437 | }), |
| 1438 | core.onInterrupt(() => core.sync(() => this.unsafeRemove(deferred))) |
| 1439 | ) |
| 1440 | }) |
| 1441 | } |
| 1442 | |
| 1443 | unsafeOnPubSubEmptySpace( |
| 1444 | pubsub: AtomicPubSub<A>, |
| 1445 | subscribers: Subscribers<A> |
| 1446 | ): void { |
| 1447 | let keepPolling = true |
| 1448 | while (keepPolling && !pubsub.isFull()) { |
| 1449 | const publisher = pipe(this.publishers, MutableQueue.poll(MutableQueue.EmptyMutableQueue)) |
| 1450 | if (publisher === MutableQueue.EmptyMutableQueue) { |
| 1451 | keepPolling = false |
| 1452 | } else { |
| 1453 | const published = pubsub.publish(publisher[0]) |
| 1454 | if (published && publisher[2]) { |
| 1455 | unsafeCompleteDeferred(publisher[1], true) |
| 1456 | } else if (!published) { |
nothing calls this directly
no outgoing calls
no test coverage detected