(endTake: Exit.Exit<never, Option.Option<E>>)
| 2173 | ) |
| 2174 | ) |
| 2175 | const finalize = (endTake: Exit.Exit<never, Option.Option<E>>): Effect.Effect<void> => |
| 2176 | // Make sure that no queues are currently being added |
| 2177 | queuesLock.withPermits(1)( |
| 2178 | pipe( |
| 2179 | Ref.set( |
| 2180 | newQueue, |
| 2181 | pipe( |
| 2182 | // All newly created queues should end immediately |
| 2183 | Queue.bounded<Exit.Exit<A, Option.Option<E>>>(1), |
| 2184 | Effect.tap((queue) => Queue.offer(queue, endTake)), |
| 2185 | Effect.flatMap((queue) => { |
| 2186 | const id = newDistributedWithDynamicId() |
| 2187 | return pipe( |
| 2188 | Ref.update(queuesRef, (map) => map.set(id, queue)), |
| 2189 | Effect.as(Tuple.make(id, queue)) |
| 2190 | ) |
| 2191 | }) |
| 2192 | ) |
| 2193 | ), |
| 2194 | Effect.zipRight( |
| 2195 | pipe( |
| 2196 | Ref.get(queuesRef), |
| 2197 | Effect.flatMap((map) => |
| 2198 | pipe( |
| 2199 | Chunk.fromIterable(map.values()), |
| 2200 | Effect.forEach((queue) => |
| 2201 | pipe( |
| 2202 | Queue.offer(queue, endTake), |
| 2203 | Effect.catchSomeCause((cause) => |
| 2204 | Cause.isInterrupted(cause) ? Option.some(Effect.void) : Option.none() |
| 2205 | ) |
| 2206 | ) |
| 2207 | ) |
| 2208 | ) |
| 2209 | ) |
| 2210 | ) |
| 2211 | ), |
| 2212 | Effect.zipRight(done(endTake)), |
| 2213 | Effect.asVoid |
| 2214 | ) |
| 2215 | ) |
| 2216 | yield* pipe( |
| 2217 | self, |
| 2218 | runForEachScoped(offer), |
no test coverage detected