MCPcopy
hub / github.com/Effect-TS/effect / BoundedPubSubSingle

Class BoundedPubSubSingle

packages/effect/src/internal/pubsub.ts:584–660  ·  view source on GitHub ↗

@internal

Source from the content-addressed store, hash-verified

582
583/** @internal */
584class BoundedPubSubSingle<in out A> implements AtomicPubSub<A> {
585 publisherIndex = 0
586 subscriberCount = 0
587 subscribers = 0
588 value: A = AbsentValue as unknown as A
589
590 readonly capacity = 1
591 constructor(readonly replayBuffer: ReplayBuffer<A> | undefined) {}
592
593 replayWindow(): ReplayWindow<A> {
594 return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow
595 }
596
597 pipe() {
598 return pipeArguments(this, arguments)
599 }
600
601 isEmpty(): boolean {
602 return this.subscribers === 0
603 }
604
605 isFull(): boolean {
606 return !this.isEmpty()
607 }
608
609 size(): number {
610 return this.isEmpty() ? 0 : 1
611 }
612
613 publish(value: A): boolean {
614 if (this.isFull()) {
615 return false
616 }
617 if (this.subscriberCount !== 0) {
618 this.value = value
619 this.subscribers = this.subscriberCount
620 this.publisherIndex += 1
621 }
622 if (this.replayBuffer) {
623 this.replayBuffer.offer(value)
624 }
625 return true
626 }
627
628 publishAll(elements: Iterable<A>): Chunk.Chunk<A> {
629 if (this.subscriberCount === 0) {
630 if (this.replayBuffer) {
631 this.replayBuffer.offerAll(elements)
632 }
633 return Chunk.empty()
634 }
635 const chunk = Chunk.fromIterable(elements)
636 if (Chunk.isEmpty(chunk)) {
637 return chunk
638 }
639 if (this.publish(Chunk.unsafeHead(chunk))) {
640 return Chunk.drop(chunk, 1)
641 } else {

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected