| 2117 | Effect.flatMap((queuesRef) => |
| 2118 | Effect.gen(function*() { |
| 2119 | const offer = (a: A): Effect.Effect<void> => |
| 2120 | pipe( |
| 2121 | decide(a), |
| 2122 | Effect.flatMap((shouldProcess) => |
| 2123 | pipe( |
| 2124 | Ref.get(queuesRef), |
| 2125 | Effect.flatMap((queues) => |
| 2126 | pipe( |
| 2127 | queues.entries(), |
| 2128 | Effect.reduce(Chunk.empty<number>(), (acc, [id, queue]) => { |
| 2129 | if (shouldProcess(id)) { |
| 2130 | return pipe( |
| 2131 | Queue.offer(queue, Exit.succeed(a)), |
| 2132 | Effect.matchCauseEffect({ |
| 2133 | onFailure: (cause) => |
| 2134 | // Ignore all downstream queues that were shut |
| 2135 | // down and remove them later |
| 2136 | Cause.isInterrupted(cause) ? |
| 2137 | Effect.succeed(pipe(acc, Chunk.prepend(id))) : |
| 2138 | Effect.failCause(cause), |
| 2139 | onSuccess: () => Effect.succeed(acc) |
| 2140 | }) |
| 2141 | ) |
| 2142 | } |
| 2143 | return Effect.succeed(acc) |
| 2144 | }), |
| 2145 | Effect.flatMap((ids) => { |
| 2146 | if (Chunk.isNonEmpty(ids)) { |
| 2147 | return Ref.update(queuesRef, (map) => { |
| 2148 | for (const id of ids) { |
| 2149 | map.delete(id) |
| 2150 | } |
| 2151 | return map |
| 2152 | }) |
| 2153 | } |
| 2154 | return Effect.void |
| 2155 | }) |
| 2156 | ) |
| 2157 | ) |
| 2158 | ) |
| 2159 | ), |
| 2160 | Effect.asVoid |
| 2161 | ) |
| 2162 | const queuesLock = yield* Effect.makeSemaphore(1) |
| 2163 | const newQueue = yield* Ref.make<Effect.Effect<[number, Queue.Queue<Exit.Exit<A, Option.Option<E>>>]>>( |
| 2164 | pipe( |