| 3090 | readonly shutdown?: boolean | undefined |
| 3091 | }): Stream.Stream<A> |
| 3092 | } = (pubsub, options): any => { |
| 3093 | const maxChunkSize = options?.maxChunkSize ?? DefaultChunkSize |
| 3094 | |
| 3095 | if (options?.scoped) { |
| 3096 | const effect = Effect.map( |
| 3097 | PubSub.subscribe(pubsub), |
| 3098 | (queue) => fromQueue(queue, { maxChunkSize, shutdown: true }) |
| 3099 | ) |
| 3100 | |
| 3101 | return options.shutdown ? Effect.map(effect, ensuring(PubSub.shutdown(pubsub))) : effect |
| 3102 | } |
| 3103 | const stream = flatMap( |
| 3104 | scoped(PubSub.subscribe(pubsub)), |
| 3105 | (queue) => fromQueue(queue, { maxChunkSize }) |
| 3106 | ) |
| 3107 | return options?.shutdown ? ensuring(stream, PubSub.shutdown(pubsub)) : stream |
| 3108 | } |
| 3109 | |
| 3110 | /** @internal */ |
| 3111 | export const fromTPubSub = <A>(pubsub: TPubSub.TPubSub<A>): Stream.Stream<A> => { |