MCPcopy Index your code
hub / github.com/Effect-TS/effect / throttleEnforceEffect

Function throttleEnforceEffect

packages/effect/src/internal/stream.ts:6813–6854  ·  view source on GitHub ↗
(
  self: Stream.Stream<A, E, R>,
  cost: (chunk: Chunk.Chunk<A>) => Effect.Effect<number, E2, R2>,
  units: number,
  duration: Duration.DurationInput,
  burst: number
)

Source from the content-addressed store, hash-verified

6811)
6812
6813const throttleEnforceEffect = <A, E, R, E2, R2>(
6814 self: Stream.Stream<A, E, R>,
6815 cost: (chunk: Chunk.Chunk<A>) => Effect.Effect<number, E2, R2>,
6816 units: number,
6817 duration: Duration.DurationInput,
6818 burst: number
6819): Stream.Stream<A, E | E2, R | R2> => {
6820 const loop = (
6821 tokens: number,
6822 timestampMillis: number
6823 ): Channel.Channel<Chunk.Chunk<A>, Chunk.Chunk<A>, E | E2, E, void, unknown, R2> =>
6824 core.readWithCause({
6825 onInput: (input: Chunk.Chunk<A>) =>
6826 pipe(
6827 cost(input),
6828 Effect.zip(Clock.currentTimeMillis),
6829 Effect.map(([weight, currentTimeMillis]) => {
6830 const elapsed = currentTimeMillis - timestampMillis
6831 const cycles = elapsed / Duration.toMillis(duration)
6832 const sum = tokens + (cycles * units)
6833 const max = units + burst < 0 ? Number.POSITIVE_INFINITY : units + burst
6834 const available = sum < 0 ? max : Math.min(sum, max)
6835 if (weight <= available) {
6836 return pipe(
6837 core.write(input),
6838 core.flatMap(() => loop(available - weight, currentTimeMillis))
6839 )
6840 }
6841 return loop(tokens, timestampMillis)
6842 }),
6843 channel.unwrap
6844 ),
6845 onFailure: core.failCause,
6846 onDone: () => core.void
6847 })
6848 const throttled = pipe(
6849 Clock.currentTimeMillis,
6850 Effect.map((currentTimeMillis) => loop(units, currentTimeMillis)),
6851 channel.unwrap
6852 )
6853 return new StreamImpl(pipe(toChannel(self), channel.pipeToOrFail(throttled)))
6854}
6855
6856const throttleShapeEffect = <A, E, R, E2, R2>(
6857 self: Stream.Stream<A, E, R>,

Callers 1

stream.tsFile · 0.85

Calls 4

pipeFunction · 0.70
loopFunction · 0.70
toChannelFunction · 0.70
mapMethod · 0.65

Tested by

no test coverage detected