MCPcopy Index your code
hub / github.com/Effect-TS/effect / throttleShapeEffect

Function throttleShapeEffect

packages/effect/src/internal/stream.ts:6856–6904  ·  view source on GitHub ↗
(
  self: Stream.Stream<A, E, R>,
  costFn: (chunk: Chunk.Chunk<A>) => Effect.Effect<number, E2, R2>,
  units: number,
  duration: Duration.DurationInput,
  burst: number
)

Source from the content-addressed store, hash-verified

6854}
6855
6856const throttleShapeEffect = <A, E, R, E2, R2>(
6857 self: Stream.Stream<A, E, R>,
6858 costFn: (chunk: Chunk.Chunk<A>) => Effect.Effect<number, E2, R2>,
6859 units: number,
6860 duration: Duration.DurationInput,
6861 burst: number
6862): Stream.Stream<A, E | E2, R | R2> => {
6863 const loop = (
6864 tokens: number,
6865 timestampMillis: number
6866 ): Channel.Channel<Chunk.Chunk<A>, Chunk.Chunk<A>, E | E2, E, void, unknown, R2> =>
6867 core.readWithCause({
6868 onInput: (input: Chunk.Chunk<A>) =>
6869 pipe(
6870 costFn(input),
6871 Effect.zip(Clock.currentTimeMillis),
6872 Effect.map(([weight, currentTimeMillis]) => {
6873 const elapsed = currentTimeMillis - timestampMillis
6874 const cycles = elapsed / Duration.toMillis(duration)
6875 const sum = tokens + (cycles * units)
6876 const max = units + burst < 0 ? Number.POSITIVE_INFINITY : units + burst
6877 const available = sum < 0 ? max : Math.min(sum, max)
6878 const remaining = available - weight
6879 const waitCycles = remaining >= 0 ? 0 : -remaining / units
6880 const delay = Duration.millis(Math.max(0, waitCycles * Duration.toMillis(duration)))
6881 if (Duration.greaterThan(delay, Duration.zero)) {
6882 return pipe(
6883 core.fromEffect(Clock.sleep(delay)),
6884 channel.zipRight(core.write(input)),
6885 core.flatMap(() => loop(remaining, currentTimeMillis))
6886 )
6887 }
6888 return core.flatMap(
6889 core.write(input),
6890 () => loop(remaining, currentTimeMillis)
6891 )
6892 }),
6893 channel.unwrap
6894 ),
6895 onFailure: core.failCause,
6896 onDone: () => core.void
6897 })
6898 const throttled = pipe(
6899 Clock.currentTimeMillis,
6900 Effect.map((currentTimeMillis) => loop(units, currentTimeMillis)),
6901 channel.unwrap
6902 )
6903 return new StreamImpl(pipe(toChannel(self), channel.pipeToOrFail(throttled)))
6904}
6905
6906/** @internal */
6907export const tick = (interval: Duration.DurationInput): Stream.Stream<void> =>

Callers 1

stream.tsFile · 0.85

Calls 4

pipeFunction · 0.70
loopFunction · 0.70
toChannelFunction · 0.70
mapMethod · 0.65

Tested by

no test coverage detected