MCPcopy Index your code
hub / github.com/Effect-TS/effect / SlidingStrategy

Class SlidingStrategy

packages/effect/src/internal/pubsub.ts:1565–1620  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

1563 * @internal
1564 */
1565export class SlidingStrategy<in out A> implements PubSubStrategy<A> {
1566 get shutdown(): Effect.Effect<void> {
1567 return core.void
1568 }
1569
1570 handleSurplus(
1571 pubsub: AtomicPubSub<A>,
1572 subscribers: Subscribers<A>,
1573 elements: Iterable<A>,
1574 _isShutdown: MutableRef.MutableRef<boolean>
1575 ): Effect.Effect<boolean> {
1576 return core.sync(() => {
1577 this.unsafeSlidingPublish(pubsub, elements)
1578 this.unsafeCompleteSubscribers(pubsub, subscribers)
1579 return true
1580 })
1581 }
1582
1583 unsafeOnPubSubEmptySpace(
1584 _pubsub: AtomicPubSub<A>,
1585 _subscribers: Subscribers<A>
1586 ): void {
1587 //
1588 }
1589
1590 unsafeCompletePollers(
1591 pubsub: AtomicPubSub<A>,
1592 subscribers: Subscribers<A>,
1593 subscription: Subscription<A>,
1594 pollers: MutableQueue.MutableQueue<Deferred.Deferred<A>>
1595 ): void {
1596 return unsafeStrategyCompletePollers(this, pubsub, subscribers, subscription, pollers)
1597 }
1598
1599 unsafeCompleteSubscribers(pubsub: AtomicPubSub<A>, subscribers: Subscribers<A>): void {
1600 return unsafeStrategyCompleteSubscribers(this, pubsub, subscribers)
1601 }
1602
1603 unsafeSlidingPublish(pubsub: AtomicPubSub<A>, elements: Iterable<A>): void {
1604 const it = elements[Symbol.iterator]()
1605 let next = it.next()
1606 if (!next.done && pubsub.capacity > 0) {
1607 let a = next.value
1608 let loop = true
1609 while (loop) {
1610 pubsub.slide()
1611 const pub = pubsub.publish(a)
1612 if (pub && (next = it.next()) && !next.done) {
1613 a = next.value
1614 } else if (pub) {
1615 loop = false
1616 }
1617 }
1618 }
1619 }
1620}
1621
1622/** @internal */

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected