MCPcopy Index your code
hub / github.com/Effect-TS/effect / raceAll

Function raceAll

packages/effect/src/internal/stream.ts:5034–5066  ·  view source on GitHub ↗
(
  ...streams: S
)

Source from the content-addressed store, hash-verified

5032
5033/** @internal */
5034export const raceAll = <S extends ReadonlyArray<Stream.Stream<any, any, any>>>(
5035 ...streams: S
5036): Stream.Stream<
5037 Stream.Stream.Success<S[number]>,
5038 Stream.Stream.Error<S[number]>,
5039 Stream.Stream.Context<S[number]>
5040> =>
5041 Deferred.make<void>().pipe(
5042 Effect.map((halt) => {
5043 let winner: number | null = null
5044 return mergeAll(
5045 streams.map((stream, index) =>
5046 stream.pipe(
5047 takeWhile(() => {
5048 if (winner === null) {
5049 winner = index
5050 Deferred.unsafeDone(halt, Exit.void)
5051 return true
5052 }
5053 return winner === index
5054 }),
5055 interruptWhen(
5056 Deferred.await(halt).pipe(
5057 Effect.flatMap(() => winner === index ? Effect.never : Effect.void)
5058 )
5059 )
5060 )
5061 ),
5062 { concurrency: streams.length }
5063 )
5064 }),
5065 unwrap
5066 )
5067
5068/** @internal */
5069export const rechunk = dual<

Callers 1

stream.tsFile · 0.70

Calls 6

unsafeDoneMethod · 0.80
mergeAllFunction · 0.70
pipeMethod · 0.65
makeMethod · 0.65
mapMethod · 0.65
awaitMethod · 0.45

Tested by

no test coverage detected