| 184 | * @category combinators |
| 185 | */ |
| 186 | export const toChannelMap = <IE, A>( |
| 187 | self: Socket, |
| 188 | f: (data: Uint8Array | string) => A |
| 189 | ): Channel.Channel< |
| 190 | Chunk.Chunk<A>, |
| 191 | Chunk.Chunk<Uint8Array | string | CloseEvent>, |
| 192 | SocketError | IE, |
| 193 | IE, |
| 194 | void, |
| 195 | unknown |
| 196 | > => |
| 197 | Effect.gen(function*() { |
| 198 | const scope = yield* Effect.scope |
| 199 | const mailbox = yield* Mailbox.make<A, SocketError | IE>() |
| 200 | const writeScope = yield* Scope.fork(scope, ExecutionStrategy.sequential) |
| 201 | const write = yield* Scope.extend(self.writer, writeScope) |
| 202 | function* emit(chunk: Chunk.Chunk<Uint8Array | string | CloseEvent>) { |
| 203 | for (const data of chunk) { |
| 204 | yield* write(data) |
| 205 | } |
| 206 | } |
| 207 | const input: AsyncProducer.AsyncInputProducer<IE, Chunk.Chunk<Uint8Array | string | CloseEvent>, unknown> = { |
| 208 | awaitRead: () => Effect.void, |
| 209 | emit(chunk) { |
| 210 | return Effect.catchAllCause( |
| 211 | Effect.gen(() => emit(chunk)), |
| 212 | (cause) => mailbox.failCause(cause) |
| 213 | ) |
| 214 | }, |
| 215 | error(error) { |
| 216 | return Effect.zipRight( |
| 217 | Scope.close(writeScope, Exit.void), |
| 218 | mailbox.failCause(error) |
| 219 | ) |
| 220 | }, |
| 221 | done() { |
| 222 | return Scope.close(writeScope, Exit.void) |
| 223 | } |
| 224 | } |
| 225 | |
| 226 | yield* self.runRaw((data) => { |
| 227 | mailbox.unsafeOffer(f(data)) |
| 228 | }).pipe( |
| 229 | Mailbox.into(mailbox), |
| 230 | Effect.forkIn(scope), |
| 231 | Effect.interruptible |
| 232 | ) |
| 233 | |
| 234 | return Channel.embedInput(Mailbox.toChannel(mailbox), input) |
| 235 | }).pipe(Channel.unwrapScoped) |
| 236 | |
| 237 | /** |
| 238 | * @since 1.0.0 |