MCPcopy
hub / github.com/Effect-TS/effect / pump

Function pump

packages/effect/src/Micro.ts:3923–3961  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

3921 let pumping = false
3922 let interrupted = false
3923 function pump() {
3924 pumping = true
3925 while (inProgress < concurrency && index < length) {
3926 const currentIndex = index
3927 const item = items[currentIndex]
3928 index++
3929 inProgress++
3930 try {
3931 const child = unsafeFork(parent, f(item, currentIndex), true, true)
3932 fibers.add(child)
3933 child.addObserver((exit) => {
3934 fibers.delete(child)
3935 if (interrupted) {
3936 return
3937 } else if (exit._tag === "Failure") {
3938 if (result === undefined) {
3939 result = exit
3940 length = index
3941 fibers.forEach((fiber) => fiber.unsafeInterrupt())
3942 }
3943 } else if (out !== undefined) {
3944 out[currentIndex] = exit.value
3945 }
3946 doneCount++
3947 inProgress--
3948 if (doneCount === length) {
3949 resume(result ?? succeed(out))
3950 } else if (!pumping && inProgress < concurrency) {
3951 pump()
3952 }
3953 })
3954 } catch (err) {
3955 result = exitDie(err)
3956 length = index
3957 fibers.forEach((fiber) => fiber.unsafeInterrupt())
3958 }
3959 }
3960 pumping = false
3961 }
3962 pump()
3963
3964 return suspend(() => {

Callers 1

forEachFunction · 0.85

Calls 8

unsafeInterruptMethod · 0.80
unsafeForkFunction · 0.70
fFunction · 0.70
resumeFunction · 0.70
exitDieFunction · 0.70
addMethod · 0.65
addObserverMethod · 0.65
succeedFunction · 0.50

Tested by

no test coverage detected