MCPcopy Index your code
hub / github.com/Effect-TS/effect / finalize

Function finalize

packages/effect/src/internal/stream.ts:2175–2215  ·  view source on GitHub ↗
(endTake: Exit.Exit<never, Option.Option<E>>)

Source from the content-addressed store, hash-verified

2173 )
2174 )
2175 const finalize = (endTake: Exit.Exit<never, Option.Option<E>>): Effect.Effect<void> =>
2176 // Make sure that no queues are currently being added
2177 queuesLock.withPermits(1)(
2178 pipe(
2179 Ref.set(
2180 newQueue,
2181 pipe(
2182 // All newly created queues should end immediately
2183 Queue.bounded<Exit.Exit<A, Option.Option<E>>>(1),
2184 Effect.tap((queue) => Queue.offer(queue, endTake)),
2185 Effect.flatMap((queue) => {
2186 const id = newDistributedWithDynamicId()
2187 return pipe(
2188 Ref.update(queuesRef, (map) => map.set(id, queue)),
2189 Effect.as(Tuple.make(id, queue))
2190 )
2191 })
2192 )
2193 ),
2194 Effect.zipRight(
2195 pipe(
2196 Ref.get(queuesRef),
2197 Effect.flatMap((map) =>
2198 pipe(
2199 Chunk.fromIterable(map.values()),
2200 Effect.forEach((queue) =>
2201 pipe(
2202 Queue.offer(queue, endTake),
2203 Effect.catchSomeCause((cause) =>
2204 Cause.isInterrupted(cause) ? Option.some(Effect.void) : Option.none()
2205 )
2206 )
2207 )
2208 )
2209 )
2210 )
2211 ),
2212 Effect.zipRight(done(endTake)),
2213 Effect.asVoid
2214 )
2215 )
2216 yield* pipe(
2217 self,
2218 runForEachScoped(offer),

Callers 1

stream.tsFile · 0.85

Calls 11

withPermitsMethod · 0.80
isInterruptedMethod · 0.80
pipeFunction · 0.70
doneFunction · 0.70
setMethod · 0.65
offerMethod · 0.65
updateMethod · 0.65
makeMethod · 0.65
getMethod · 0.65
valuesMethod · 0.45

Tested by

no test coverage detected