(this: this, max: number)
| 1051 | } |
| 1052 | |
| 1053 | takeUpTo(this: this, max: number): Effect.Effect<Chunk.Chunk<A>> { |
| 1054 | return core.suspend(() => { |
| 1055 | if (MutableRef.get(this.shutdownFlag)) { |
| 1056 | return core.interrupt |
| 1057 | } |
| 1058 | let replay: Chunk.Chunk<A> | undefined = undefined |
| 1059 | if (this.replayWindow.remaining >= max) { |
| 1060 | const as = this.replayWindow.takeN(max) |
| 1061 | return core.succeed(as) |
| 1062 | } else if (this.replayWindow.remaining > 0) { |
| 1063 | replay = this.replayWindow.takeAll() |
| 1064 | max = max - replay.length |
| 1065 | } |
| 1066 | const as = MutableQueue.isEmpty(this.pollers) |
| 1067 | ? unsafePollN(this.subscription, max) |
| 1068 | : Chunk.empty() |
| 1069 | this.strategy.unsafeOnPubSubEmptySpace(this.pubsub, this.subscribers) |
| 1070 | return replay ? core.succeed(Chunk.appendAll(replay, as)) : core.succeed(as) |
| 1071 | }) |
| 1072 | } |
| 1073 | |
| 1074 | takeBetween(min: number, max: number): Effect.Effect<Chunk.Chunk<A>> { |
| 1075 | return core.suspend(() => takeRemainderLoop(this, min, max, Chunk.empty())) |
nothing calls this directly
no test coverage detected