| 611 | |
| 612 | /** @internal */ |
| 613 | export const asyncPush = <A, E = never, R = never>( |
| 614 | register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, E, R | Scope.Scope>, |
| 615 | options?: { |
| 616 | readonly bufferSize: "unbounded" |
| 617 | } | { |
| 618 | readonly bufferSize?: number | undefined |
| 619 | readonly strategy?: "dropping" | "sliding" | undefined |
| 620 | } | undefined |
| 621 | ): Stream.Stream<A, E, Exclude<R, Scope.Scope>> => |
| 622 | Effect.acquireRelease( |
| 623 | queueFromBufferOptionsPush<A, E>(options), |
| 624 | Queue.shutdown |
| 625 | ).pipe( |
| 626 | Effect.tap((queue) => |
| 627 | FiberRef.getWith(FiberRef.currentScheduler, (scheduler) => register(emit.makePush(queue, scheduler))) |
| 628 | ), |
| 629 | Effect.map((queue) => { |
| 630 | const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E> = core.flatMap(Queue.take(queue), (item) => |
| 631 | Exit.isExit(item) |
| 632 | ? Exit.isSuccess(item) ? core.void : core.failCause(item.cause) |
| 633 | : channel.zipRight(core.write(Chunk.unsafeFromArray(item)), loop)) |
| 634 | return loop |
| 635 | }), |
| 636 | channel.unwrapScoped, |
| 637 | fromChannel |
| 638 | ) |
| 639 | |
| 640 | /** @internal */ |
| 641 | export const asyncScoped = <A, E = never, R = never>( |