MCPcopy Index your code
hub / github.com/Effect-TS/effect / BoundedPubSubPow2Subscription

Class BoundedPubSubPow2Subscription

packages/effect/src/internal/pubsub.ts:497–581  ·  view source on GitHub ↗

@internal

Source from the content-addressed store, hash-verified

495
496/** @internal */
497class BoundedPubSubPow2Subscription<in out A> implements Subscription<A> {
498 constructor(
499 private self: BoundedPubSubPow2<A>,
500 private subscriberIndex: number,
501 private unsubscribed: boolean
502 ) {
503 }
504
505 isEmpty(): boolean {
506 return (
507 this.unsubscribed ||
508 this.self.publisherIndex === this.subscriberIndex ||
509 this.self.publisherIndex === this.self.subscribersIndex
510 )
511 }
512
513 size() {
514 if (this.unsubscribed) {
515 return 0
516 }
517 return this.self.publisherIndex - Math.max(this.subscriberIndex, this.self.subscribersIndex)
518 }
519
520 poll<D>(default_: D): A | D {
521 if (this.unsubscribed) {
522 return default_
523 }
524 this.subscriberIndex = Math.max(this.subscriberIndex, this.self.subscribersIndex)
525 if (this.subscriberIndex !== this.self.publisherIndex) {
526 const index = this.subscriberIndex & this.self.mask
527 const elem = this.self.array[index]!
528 this.self.subscribers[index] -= 1
529 if (this.self.subscribers[index] === 0) {
530 this.self.array[index] = AbsentValue as unknown as A
531 this.self.subscribersIndex += 1
532 }
533 this.subscriberIndex += 1
534 return elem
535 }
536 return default_
537 }
538
539 pollUpTo(n: number): Chunk.Chunk<A> {
540 if (this.unsubscribed) {
541 return Chunk.empty()
542 }
543 this.subscriberIndex = Math.max(this.subscriberIndex, this.self.subscribersIndex)
544 const size = this.self.publisherIndex - this.subscriberIndex
545 const toPoll = Math.min(n, size)
546 if (toPoll <= 0) {
547 return Chunk.empty()
548 }
549 const builder: Array<A> = []
550 const pollUpToIndex = this.subscriberIndex + toPoll
551 while (this.subscriberIndex !== pollUpToIndex) {
552 const index = this.subscriberIndex & this.self.mask
553 const elem = this.self.array[index] as A
554 this.self.subscribers[index] -= 1

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected