| 6811 | ) |
| 6812 | |
| 6813 | const throttleEnforceEffect = <A, E, R, E2, R2>( |
| 6814 | self: Stream.Stream<A, E, R>, |
| 6815 | cost: (chunk: Chunk.Chunk<A>) => Effect.Effect<number, E2, R2>, |
| 6816 | units: number, |
| 6817 | duration: Duration.DurationInput, |
| 6818 | burst: number |
| 6819 | ): Stream.Stream<A, E | E2, R | R2> => { |
| 6820 | const loop = ( |
| 6821 | tokens: number, |
| 6822 | timestampMillis: number |
| 6823 | ): Channel.Channel<Chunk.Chunk<A>, Chunk.Chunk<A>, E | E2, E, void, unknown, R2> => |
| 6824 | core.readWithCause({ |
| 6825 | onInput: (input: Chunk.Chunk<A>) => |
| 6826 | pipe( |
| 6827 | cost(input), |
| 6828 | Effect.zip(Clock.currentTimeMillis), |
| 6829 | Effect.map(([weight, currentTimeMillis]) => { |
| 6830 | const elapsed = currentTimeMillis - timestampMillis |
| 6831 | const cycles = elapsed / Duration.toMillis(duration) |
| 6832 | const sum = tokens + (cycles * units) |
| 6833 | const max = units + burst < 0 ? Number.POSITIVE_INFINITY : units + burst |
| 6834 | const available = sum < 0 ? max : Math.min(sum, max) |
| 6835 | if (weight <= available) { |
| 6836 | return pipe( |
| 6837 | core.write(input), |
| 6838 | core.flatMap(() => loop(available - weight, currentTimeMillis)) |
| 6839 | ) |
| 6840 | } |
| 6841 | return loop(tokens, timestampMillis) |
| 6842 | }), |
| 6843 | channel.unwrap |
| 6844 | ), |
| 6845 | onFailure: core.failCause, |
| 6846 | onDone: () => core.void |
| 6847 | }) |
| 6848 | const throttled = pipe( |
| 6849 | Clock.currentTimeMillis, |
| 6850 | Effect.map((currentTimeMillis) => loop(units, currentTimeMillis)), |
| 6851 | channel.unwrap |
| 6852 | ) |
| 6853 | return new StreamImpl(pipe(toChannel(self), channel.pipeToOrFail(throttled))) |
| 6854 | } |
| 6855 | |
| 6856 | const throttleShapeEffect = <A, E, R, E2, R2>( |
| 6857 | self: Stream.Stream<A, E, R>, |