| 6854 | } |
| 6855 | |
| 6856 | const throttleShapeEffect = <A, E, R, E2, R2>( |
| 6857 | self: Stream.Stream<A, E, R>, |
| 6858 | costFn: (chunk: Chunk.Chunk<A>) => Effect.Effect<number, E2, R2>, |
| 6859 | units: number, |
| 6860 | duration: Duration.DurationInput, |
| 6861 | burst: number |
| 6862 | ): Stream.Stream<A, E | E2, R | R2> => { |
| 6863 | const loop = ( |
| 6864 | tokens: number, |
| 6865 | timestampMillis: number |
| 6866 | ): Channel.Channel<Chunk.Chunk<A>, Chunk.Chunk<A>, E | E2, E, void, unknown, R2> => |
| 6867 | core.readWithCause({ |
| 6868 | onInput: (input: Chunk.Chunk<A>) => |
| 6869 | pipe( |
| 6870 | costFn(input), |
| 6871 | Effect.zip(Clock.currentTimeMillis), |
| 6872 | Effect.map(([weight, currentTimeMillis]) => { |
| 6873 | const elapsed = currentTimeMillis - timestampMillis |
| 6874 | const cycles = elapsed / Duration.toMillis(duration) |
| 6875 | const sum = tokens + (cycles * units) |
| 6876 | const max = units + burst < 0 ? Number.POSITIVE_INFINITY : units + burst |
| 6877 | const available = sum < 0 ? max : Math.min(sum, max) |
| 6878 | const remaining = available - weight |
| 6879 | const waitCycles = remaining >= 0 ? 0 : -remaining / units |
| 6880 | const delay = Duration.millis(Math.max(0, waitCycles * Duration.toMillis(duration))) |
| 6881 | if (Duration.greaterThan(delay, Duration.zero)) { |
| 6882 | return pipe( |
| 6883 | core.fromEffect(Clock.sleep(delay)), |
| 6884 | channel.zipRight(core.write(input)), |
| 6885 | core.flatMap(() => loop(remaining, currentTimeMillis)) |
| 6886 | ) |
| 6887 | } |
| 6888 | return core.flatMap( |
| 6889 | core.write(input), |
| 6890 | () => loop(remaining, currentTimeMillis) |
| 6891 | ) |
| 6892 | }), |
| 6893 | channel.unwrap |
| 6894 | ), |
| 6895 | onFailure: core.failCause, |
| 6896 | onDone: () => core.void |
| 6897 | }) |
| 6898 | const throttled = pipe( |
| 6899 | Clock.currentTimeMillis, |
| 6900 | Effect.map((currentTimeMillis) => loop(units, currentTimeMillis)), |
| 6901 | channel.unwrap |
| 6902 | ) |
| 6903 | return new StreamImpl(pipe(toChannel(self), channel.pipeToOrFail(throttled))) |
| 6904 | } |
| 6905 | |
| 6906 | /** @internal */ |
| 6907 | export const tick = (interval: Duration.DurationInput): Stream.Stream<void> => |