(
{
bufferSize = 16,
concurrency,
mergeStrategy = mergeStrategy_.BackPressure()
}: {
readonly concurrency: number | "unbounded"
readonly bufferSize?: number | undefined
readonly mergeStrategy?: MergeStrategy.MergeStrategy | undefined
}
)
| 1084 | |
| 1085 | /** @internal */ |
| 1086 | export const mergeAllWith = ( |
| 1087 | { |
| 1088 | bufferSize = 16, |
| 1089 | concurrency, |
| 1090 | mergeStrategy = mergeStrategy_.BackPressure() |
| 1091 | }: { |
| 1092 | readonly concurrency: number | "unbounded" |
| 1093 | readonly bufferSize?: number | undefined |
| 1094 | readonly mergeStrategy?: MergeStrategy.MergeStrategy | undefined |
| 1095 | } |
| 1096 | ) => |
| 1097 | <OutElem, InElem1, OutErr1, InErr1, OutDone, InDone1, Env1, InElem, OutErr, InErr, InDone, Env>( |
| 1098 | channels: Channel.Channel< |
| 1099 | Channel.Channel<OutElem, InElem1, OutErr1, InErr1, OutDone, InDone1, Env1>, |
| 1100 | InElem, |
| 1101 | OutErr, |
| 1102 | InErr, |
| 1103 | OutDone, |
| 1104 | InDone, |
| 1105 | Env |
| 1106 | >, |
| 1107 | f: (o1: OutDone, o2: OutDone) => OutDone |
| 1108 | ): Channel.Channel< |
| 1109 | OutElem, |
| 1110 | InElem & InElem1, |
| 1111 | OutErr | OutErr1, |
| 1112 | InErr & InErr1, |
| 1113 | OutDone, |
| 1114 | InDone & InDone1, |
| 1115 | Env | Env1 |
| 1116 | > => |
| 1117 | unwrapScopedWith( |
| 1118 | (scope) => |
| 1119 | Effect.gen(function*() { |
| 1120 | const concurrencyN = concurrency === "unbounded" ? Number.MAX_SAFE_INTEGER : concurrency |
| 1121 | const input = yield* singleProducerAsyncInput.make< |
| 1122 | InErr & InErr1, |
| 1123 | InElem & InElem1, |
| 1124 | InDone & InDone1 |
| 1125 | >() |
| 1126 | const queueReader = fromInput(input) |
| 1127 | const queue = yield* Queue.bounded<Effect.Effect<Either.Either<OutElem, OutDone>, OutErr | OutErr1, Env>>( |
| 1128 | bufferSize |
| 1129 | ) |
| 1130 | yield* Scope.addFinalizer(scope, Queue.shutdown(queue)) |
| 1131 | const cancelers = yield* Queue.unbounded<Deferred.Deferred<void>>() |
| 1132 | yield* Scope.addFinalizer(scope, Queue.shutdown(cancelers)) |
| 1133 | const lastDone = yield* Ref.make<Option.Option<OutDone>>(Option.none()) |
| 1134 | const errorSignal = yield* Deferred.make<void>() |
| 1135 | const withPermits = (yield* Effect.makeSemaphore(concurrencyN)).withPermits |
| 1136 | const pull = yield* toPullIn(core.pipeTo(queueReader, channels), scope) |
| 1137 | |
| 1138 | function evaluatePull( |
| 1139 | pull: Effect.Effect< |
| 1140 | Either.Either<OutElem, OutDone>, |
| 1141 | OutErr | OutErr1, |
| 1142 | Env | Env1 |
| 1143 | > |
no test coverage detected