@internal
| 400 | |
| 401 | /** @internal */ |
| 402 | class BoundedPubSubPow2<in out A> implements AtomicPubSub<A> { |
| 403 | array: Array<A> |
| 404 | mask: number |
| 405 | publisherIndex = 0 |
| 406 | subscribers: Array<number> |
| 407 | subscriberCount = 0 |
| 408 | subscribersIndex = 0 |
| 409 | |
| 410 | constructor(readonly capacity: number, readonly replayBuffer: ReplayBuffer<A> | undefined) { |
| 411 | this.array = Array.from({ length: capacity }) |
| 412 | this.mask = capacity - 1 |
| 413 | this.subscribers = Array.from({ length: capacity }) |
| 414 | } |
| 415 | |
| 416 | replayWindow(): ReplayWindow<A> { |
| 417 | return this.replayBuffer ? new ReplayWindowImpl(this.replayBuffer) : emptyReplayWindow |
| 418 | } |
| 419 | |
| 420 | isEmpty(): boolean { |
| 421 | return this.publisherIndex === this.subscribersIndex |
| 422 | } |
| 423 | |
| 424 | isFull(): boolean { |
| 425 | return this.publisherIndex === this.subscribersIndex + this.capacity |
| 426 | } |
| 427 | |
| 428 | size(): number { |
| 429 | return this.publisherIndex - this.subscribersIndex |
| 430 | } |
| 431 | |
| 432 | publish(value: A): boolean { |
| 433 | if (this.isFull()) { |
| 434 | return false |
| 435 | } |
| 436 | if (this.subscriberCount !== 0) { |
| 437 | const index = this.publisherIndex & this.mask |
| 438 | this.array[index] = value |
| 439 | this.subscribers[index] = this.subscriberCount |
| 440 | this.publisherIndex += 1 |
| 441 | } |
| 442 | if (this.replayBuffer) { |
| 443 | this.replayBuffer.offer(value) |
| 444 | } |
| 445 | return true |
| 446 | } |
| 447 | |
| 448 | publishAll(elements: Iterable<A>): Chunk.Chunk<A> { |
| 449 | if (this.subscriberCount === 0) { |
| 450 | if (this.replayBuffer) { |
| 451 | this.replayBuffer.offerAll(elements) |
| 452 | } |
| 453 | return Chunk.empty() |
| 454 | } |
| 455 | const chunk = Chunk.fromIterable(elements) |
| 456 | const n = chunk.length |
| 457 | const size = this.publisherIndex - this.subscribersIndex |
| 458 | const available = this.capacity - size |
| 459 | const forPubSub = Math.min(n, available) |
nothing calls this directly
no outgoing calls
no test coverage detected