| 312 | } |
| 313 | |
| 314 | class BoundedPubSubArbSubscription<in out A> implements Subscription<A> { |
| 315 | constructor( |
| 316 | private self: BoundedPubSubArb<A>, |
| 317 | private subscriberIndex: number, |
| 318 | private unsubscribed: boolean |
| 319 | ) { |
| 320 | } |
| 321 | |
| 322 | isEmpty(): boolean { |
| 323 | return ( |
| 324 | this.unsubscribed || |
| 325 | this.self.publisherIndex === this.subscriberIndex || |
| 326 | this.self.publisherIndex === this.self.subscribersIndex |
| 327 | ) |
| 328 | } |
| 329 | |
| 330 | size() { |
| 331 | if (this.unsubscribed) { |
| 332 | return 0 |
| 333 | } |
| 334 | return this.self.publisherIndex - Math.max(this.subscriberIndex, this.self.subscribersIndex) |
| 335 | } |
| 336 | |
| 337 | poll<D>(default_: D): A | D { |
| 338 | if (this.unsubscribed) { |
| 339 | return default_ |
| 340 | } |
| 341 | this.subscriberIndex = Math.max(this.subscriberIndex, this.self.subscribersIndex) |
| 342 | if (this.subscriberIndex !== this.self.publisherIndex) { |
| 343 | const index = this.subscriberIndex % this.self.capacity |
| 344 | const elem = this.self.array[index]! |
| 345 | this.self.subscribers[index] -= 1 |
| 346 | if (this.self.subscribers[index] === 0) { |
| 347 | this.self.array[index] = AbsentValue as unknown as A |
| 348 | this.self.subscribersIndex += 1 |
| 349 | } |
| 350 | this.subscriberIndex += 1 |
| 351 | return elem |
| 352 | } |
| 353 | return default_ |
| 354 | } |
| 355 | |
| 356 | pollUpTo(n: number): Chunk.Chunk<A> { |
| 357 | if (this.unsubscribed) { |
| 358 | return Chunk.empty() |
| 359 | } |
| 360 | this.subscriberIndex = Math.max(this.subscriberIndex, this.self.subscribersIndex) |
| 361 | const size = this.self.publisherIndex - this.subscriberIndex |
| 362 | const toPoll = Math.min(n, size) |
| 363 | if (toPoll <= 0) { |
| 364 | return Chunk.empty() |
| 365 | } |
| 366 | const builder: Array<A> = [] |
| 367 | const pollUpToIndex = this.subscriberIndex + toPoll |
| 368 | while (this.subscriberIndex !== pollUpToIndex) { |
| 369 | const index = this.subscriberIndex % this.self.capacity |
| 370 | const a = this.self.array[index] as A |
| 371 | this.self.subscribers[index] -= 1 |
nothing calls this directly
no outgoing calls
no test coverage detected