(
pull: Effect.Effect<
Either.Either<OutElem, OutDone>,
OutErr | OutErr1,
Env | Env1
>
)
| 1136 | const pull = yield* toPullIn(core.pipeTo(queueReader, channels), scope) |
| 1137 | |
| 1138 | function evaluatePull( |
| 1139 | pull: Effect.Effect< |
| 1140 | Either.Either<OutElem, OutDone>, |
| 1141 | OutErr | OutErr1, |
| 1142 | Env | Env1 |
| 1143 | > |
| 1144 | ) { |
| 1145 | return pull.pipe( |
| 1146 | Effect.flatMap(Either.match({ |
| 1147 | onLeft: (done) => Effect.succeed(Option.some(done)), |
| 1148 | onRight: (outElem) => |
| 1149 | Effect.as( |
| 1150 | Queue.offer(queue, Effect.succeed(Either.right(outElem))), |
| 1151 | Option.none() |
| 1152 | ) |
| 1153 | })), |
| 1154 | Effect.repeat({ until: (_): _ is Option.Some<OutDone> => Option.isSome(_) }), |
| 1155 | Effect.flatMap((outDone) => |
| 1156 | Ref.update( |
| 1157 | lastDone, |
| 1158 | Option.match({ |
| 1159 | onNone: () => Option.some(outDone.value), |
| 1160 | onSome: (lastDone) => Option.some(f(lastDone, outDone.value)) |
| 1161 | }) |
| 1162 | ) |
| 1163 | ), |
| 1164 | Effect.catchAllCause((cause) => |
| 1165 | Cause.isInterrupted(cause) |
| 1166 | ? Effect.failCause(cause) |
| 1167 | : Queue.offer(queue, Effect.failCause(cause)).pipe( |
| 1168 | Effect.zipRight(Deferred.succeed(errorSignal, void 0)), |
| 1169 | Effect.asVoid |
| 1170 | ) |
| 1171 | ) |
| 1172 | ) |
| 1173 | } |
| 1174 | |
| 1175 | yield* pull.pipe( |
| 1176 | Effect.matchCauseEffect({ |
no test coverage detected