MCPcopy
hub / github.com/Effect-TS/effect / forEach

Function forEach

packages/effect/src/Micro.ts:3878–3970  ·  view source on GitHub ↗
(iterable: Iterable<A>, f: (a: A, index: number) => Micro<B, E, R>, options?: {
  readonly concurrency?: Concurrency | undefined
  readonly discard?: boolean | undefined
})

Source from the content-addressed store, hash-verified

3876 readonly discard: true
3877 }): Micro<void, E, R>
3878} = <
3879 A,
3880 B,
3881 E,
3882 R
3883>(iterable: Iterable<A>, f: (a: A, index: number) => Micro<B, E, R>, options?: {
3884 readonly concurrency?: Concurrency | undefined
3885 readonly discard?: boolean | undefined
3886}): Micro<any, E, R> =>
3887 withMicroFiber((parent) => {
3888 const concurrencyOption = options?.concurrency === "inherit"
3889 ? parent.getRef(CurrentConcurrency)
3890 : options?.concurrency ?? 1
3891 const concurrency = concurrencyOption === "unbounded"
3892 ? Number.POSITIVE_INFINITY
3893 : Math.max(1, concurrencyOption)
3894
3895 const items = Arr.fromIterable(iterable)
3896 let length = items.length
3897 if (length === 0) {
3898 return options?.discard ? void_ : succeed([])
3899 }
3900
3901 const out: Array<B> | undefined = options?.discard ? undefined : new Array(length)
3902 let index = 0
3903
3904 if (concurrency === 1) {
3905 return as(
3906 whileLoop({
3907 while: () => index < items.length,
3908 body: () => f(items[index], index),
3909 step: out ?
3910 (b) => out[index++] = b :
3911 (_) => index++
3912 }),
3913 out as any
3914 )
3915 }
3916 return async((resume) => {
3917 const fibers = new Set<MicroFiber<unknown, unknown>>()
3918 let result: MicroExit<any, any> | undefined = undefined
3919 let inProgress = 0
3920 let doneCount = 0
3921 let pumping = false
3922 let interrupted = false
3923 function pump() {
3924 pumping = true
3925 while (inProgress < concurrency && index < length) {
3926 const currentIndex = index
3927 const item = items[currentIndex]
3928 index++
3929 inProgress++
3930 try {
3931 const child = unsafeFork(parent, f(item, currentIndex), true, true)
3932 fibers.add(child)
3933 child.addObserver((exit) => {
3934 fibers.delete(child)
3935 if (interrupted) {

Callers 4

closeMethod · 0.70
allFunction · 0.70
filterMapFunction · 0.70
SortedSet.tsFile · 0.70

Calls 9

asFunction · 0.85
whileLoopFunction · 0.85
asyncFunction · 0.85
pumpFunction · 0.85
fiberInterruptAllFunction · 0.85
getRefMethod · 0.80
fFunction · 0.70
suspendFunction · 0.70
succeedFunction · 0.50

Tested by

no test coverage detected