MCPcopy
hub / github.com/Effect-TS/effect / mergeAllWith

Function mergeAllWith

packages/effect/src/internal/channel.ts:1086–1279  ·  view source on GitHub ↗
(
  {
    bufferSize = 16,
    concurrency,
    mergeStrategy = mergeStrategy_.BackPressure()
  }: {
    readonly concurrency: number | "unbounded"
    readonly bufferSize?: number | undefined
    readonly mergeStrategy?: MergeStrategy.MergeStrategy | undefined
  }
)

Source from the content-addressed store, hash-verified

1084
1085/** @internal */
1086export const mergeAllWith = (
1087 {
1088 bufferSize = 16,
1089 concurrency,
1090 mergeStrategy = mergeStrategy_.BackPressure()
1091 }: {
1092 readonly concurrency: number | "unbounded"
1093 readonly bufferSize?: number | undefined
1094 readonly mergeStrategy?: MergeStrategy.MergeStrategy | undefined
1095 }
1096) =>
1097<OutElem, InElem1, OutErr1, InErr1, OutDone, InDone1, Env1, InElem, OutErr, InErr, InDone, Env>(
1098 channels: Channel.Channel<
1099 Channel.Channel<OutElem, InElem1, OutErr1, InErr1, OutDone, InDone1, Env1>,
1100 InElem,
1101 OutErr,
1102 InErr,
1103 OutDone,
1104 InDone,
1105 Env
1106 >,
1107 f: (o1: OutDone, o2: OutDone) => OutDone
1108): Channel.Channel<
1109 OutElem,
1110 InElem & InElem1,
1111 OutErr | OutErr1,
1112 InErr & InErr1,
1113 OutDone,
1114 InDone & InDone1,
1115 Env | Env1
1116> =>
1117 unwrapScopedWith(
1118 (scope) =>
1119 Effect.gen(function*() {
1120 const concurrencyN = concurrency === "unbounded" ? Number.MAX_SAFE_INTEGER : concurrency
1121 const input = yield* singleProducerAsyncInput.make<
1122 InErr & InErr1,
1123 InElem & InElem1,
1124 InDone & InDone1
1125 >()
1126 const queueReader = fromInput(input)
1127 const queue = yield* Queue.bounded<Effect.Effect<Either.Either<OutElem, OutDone>, OutErr | OutErr1, Env>>(
1128 bufferSize
1129 )
1130 yield* Scope.addFinalizer(scope, Queue.shutdown(queue))
1131 const cancelers = yield* Queue.unbounded<Deferred.Deferred<void>>()
1132 yield* Scope.addFinalizer(scope, Queue.shutdown(cancelers))
1133 const lastDone = yield* Ref.make<Option.Option<OutDone>>(Option.none())
1134 const errorSignal = yield* Deferred.make<void>()
1135 const withPermits = (yield* Effect.makeSemaphore(concurrencyN)).withPermits
1136 const pull = yield* toPullIn(core.pipeTo(queueReader, channels), scope)
1137
1138 function evaluatePull(
1139 pull: Effect.Effect<
1140 Either.Either<OutElem, OutDone>,
1141 OutErr | OutErr1,
1142 Env | Env1
1143 >

Callers 4

mergeAllFunction · 0.85
mergeAllUnboundedFunction · 0.85
mergeAllUnboundedWithFunction · 0.85
channel.tsFile · 0.85

Calls 15

evaluatePullFunction · 0.85
failCauseMethod · 0.80
unwrapScopedWithFunction · 0.70
fromInputFunction · 0.70
pipeFunction · 0.70
makeMethod · 0.65
addFinalizerMethod · 0.65
pipeMethod · 0.65
offerMethod · 0.65
getMethod · 0.65
sizeMethod · 0.65
takeMethod · 0.65

Tested by

no test coverage detected