(scope: Scope.Scope)
| 1658 | } |
| 1659 | ): Sink.Sink<A3 | A4, In & In2, L2 | L, E2 | E, R2 | R> => { |
| 1660 | function race(scope: Scope.Scope) { |
| 1661 | return Effect.gen(function*() { |
| 1662 | const pubsub = yield* PubSub.bounded< |
| 1663 | Either.Either<Chunk.Chunk<In & In2>, Exit.Exit<unknown>> |
| 1664 | >(options?.capacity ?? 16) |
| 1665 | const subscription1 = yield* Scope.extend(PubSub.subscribe(pubsub), scope) |
| 1666 | const subscription2 = yield* Scope.extend(PubSub.subscribe(pubsub), scope) |
| 1667 | const reader = channel.toPubSub(pubsub) |
| 1668 | const writer = channel.fromQueue(subscription1).pipe( |
| 1669 | core.pipeTo(toChannel(self)), |
| 1670 | channel.zipLeft(core.fromEffect(Queue.shutdown(subscription1))), |
| 1671 | channel.mergeWith({ |
| 1672 | other: channel.fromQueue(subscription2).pipe( |
| 1673 | core.pipeTo(toChannel(options.other)), |
| 1674 | channel.zipLeft(core.fromEffect(Queue.shutdown(subscription2))) |
| 1675 | ), |
| 1676 | onSelfDone: options.onSelfDone, |
| 1677 | onOtherDone: options.onOtherDone |
| 1678 | }) |
| 1679 | ) |
| 1680 | const racedChannel = channel.mergeWith(reader, { |
| 1681 | other: writer, |
| 1682 | onSelfDone: () => mergeDecision.Await(identity), |
| 1683 | onOtherDone: (exit) => mergeDecision.Done(exit) |
| 1684 | }) as Channel.Channel< |
| 1685 | Chunk.Chunk<L | L2>, |
| 1686 | Chunk.Chunk<In & In2>, |
| 1687 | E | E2, |
| 1688 | never, |
| 1689 | A3 | A4, |
| 1690 | unknown, |
| 1691 | R | R2 |
| 1692 | > |
| 1693 | return new SinkImpl(racedChannel) |
| 1694 | }) |
| 1695 | } |
| 1696 | return unwrapScopedWith(race) |
| 1697 | } |
| 1698 | ) |
nothing calls this directly
no test coverage detected