MCPcopy Index your code
hub / github.com/Effect-TS/effect / forEachConcurrentDiscard

Function forEachConcurrentDiscard

packages/effect/src/internal/fiberRuntime.ts:2193–2402  ·  view source on GitHub ↗
(
  self: Iterable<A>,
  f: (a: A, i: number) => Effect.Effect<X, E, R>,
  batching: boolean,
  processAll: boolean,
  n?: number
)

Source from the content-addressed store, hash-verified

2191
2192/** @internal */
2193export const forEachConcurrentDiscard = <A, X, E, R>(
2194 self: Iterable<A>,
2195 f: (a: A, i: number) => Effect.Effect<X, E, R>,
2196 batching: boolean,
2197 processAll: boolean,
2198 n?: number
2199): Effect.Effect<void, E, R> =>
2200 core.uninterruptibleMask((restore) =>
2201 core.transplant((graft) =>
2202 core.withFiberRuntime<void, E, R>((parent) => {
2203 let todos = Array.from(self).reverse()
2204 let target = todos.length
2205 if (target === 0) {
2206 return core.void
2207 }
2208 let counter = 0
2209 let interrupted = false
2210 const fibersCount = n ? Math.min(todos.length, n) : todos.length
2211 const fibers = new Set<FiberRuntime<Exit.Exit<X, E> | Effect.Blocked<X, E>>>()
2212 const results = new Array()
2213 const interruptAll = () =>
2214 fibers.forEach((fiber) => {
2215 fiber.currentScheduler.scheduleTask(
2216 () => {
2217 fiber.unsafeInterruptAsFork(parent.id())
2218 },
2219 0,
2220 fiber
2221 )
2222 })
2223 const startOrder = new Array<FiberRuntime<Exit.Exit<X, E> | Effect.Blocked<X, E>>>()
2224 const joinOrder = new Array<FiberRuntime<Exit.Exit<X, E> | Effect.Blocked<X, E>>>()
2225 const residual = new Array<core.Blocked>()
2226 const collectExits = () => {
2227 const exits: Array<Exit.Exit<any, E>> = results
2228 .filter(({ exit }) => exit._tag === "Failure")
2229 .sort((a, b) => a.index < b.index ? -1 : a.index === b.index ? 0 : 1)
2230 .map(({ exit }) => exit)
2231 if (exits.length === 0) {
2232 exits.push(core.exitVoid)
2233 }
2234 return exits
2235 }
2236 const runFiber = <A, E, R>(eff: Effect.Effect<A, E, R>, interruptImmediately = false) => {
2237 const runnable = core.uninterruptible(graft(eff))
2238 const fiber = unsafeForkUnstarted(
2239 runnable,
2240 parent,
2241 parent.currentRuntimeFlags,
2242 fiberScope.globalScope
2243 )
2244 parent.currentScheduler.scheduleTask(
2245 () => {
2246 if (interruptImmediately) {
2247 fiber.unsafeInterruptAsFork(parent.id())
2248 }
2249 fiber.resume(runnable)
2250 },

Callers 5

runBlockedRequestsFunction · 0.85
fiberRuntime.tsFile · 0.85
forEachParUnboundedFunction · 0.85
nextFunction · 0.85
forEachParNFunction · 0.85

Calls 8

runFiberFunction · 0.85
restoreFunction · 0.85
onInterruptSignalFunction · 0.85
checkFunction · 0.85
nextFunction · 0.70
onExitMethod · 0.65
joinMethod · 0.65
addObserverMethod · 0.65

Tested by

no test coverage detected