MCPcopy
hub / github.com/Effect-TS/effect / race

Function race

packages/effect/src/internal/sink.ts:1660–1695  ·  view source on GitHub ↗
(scope: Scope.Scope)

Source from the content-addressed store, hash-verified

1658 }
1659 ): Sink.Sink<A3 | A4, In & In2, L2 | L, E2 | E, R2 | R> => {
1660 function race(scope: Scope.Scope) {
1661 return Effect.gen(function*() {
1662 const pubsub = yield* PubSub.bounded<
1663 Either.Either<Chunk.Chunk<In & In2>, Exit.Exit<unknown>>
1664 >(options?.capacity ?? 16)
1665 const subscription1 = yield* Scope.extend(PubSub.subscribe(pubsub), scope)
1666 const subscription2 = yield* Scope.extend(PubSub.subscribe(pubsub), scope)
1667 const reader = channel.toPubSub(pubsub)
1668 const writer = channel.fromQueue(subscription1).pipe(
1669 core.pipeTo(toChannel(self)),
1670 channel.zipLeft(core.fromEffect(Queue.shutdown(subscription1))),
1671 channel.mergeWith({
1672 other: channel.fromQueue(subscription2).pipe(
1673 core.pipeTo(toChannel(options.other)),
1674 channel.zipLeft(core.fromEffect(Queue.shutdown(subscription2)))
1675 ),
1676 onSelfDone: options.onSelfDone,
1677 onOtherDone: options.onOtherDone
1678 })
1679 )
1680 const racedChannel = channel.mergeWith(reader, {
1681 other: writer,
1682 onSelfDone: () => mergeDecision.Await(identity),
1683 onOtherDone: (exit) => mergeDecision.Done(exit)
1684 }) as Channel.Channel<
1685 Chunk.Chunk<L | L2>,
1686 Chunk.Chunk<In & In2>,
1687 E | E2,
1688 never,
1689 A3 | A4,
1690 unknown,
1691 R | R2
1692 >
1693 return new SinkImpl(racedChannel)
1694 })
1695 }
1696 return unwrapScopedWith(race)
1697 }
1698)

Callers

nothing calls this directly

Calls 5

fromEffectMethod · 0.80
toChannelFunction · 0.70
subscribeMethod · 0.65
pipeMethod · 0.65
shutdownMethod · 0.45

Tested by

no test coverage detected