MCPcopy
hub / github.com/Effect-TS/effect / unsafeReadableRead

Function unsafeReadableRead

packages/platform-node-shared/src/internal/stream.ts:254–311  ·  view source on GitHub ↗
(
  readable: Readable | NodeJS.ReadableStream,
  onError: (error: unknown) => E,
  exit: MutableRef.MutableRef<Exit.Exit<void, E> | undefined>,
  options: FromReadableOptions | undefined
)

Source from the content-addressed store, hash-verified

252 })
253
254const unsafeReadableRead = <A, E>(
255 readable: Readable | NodeJS.ReadableStream,
256 onError: (error: unknown) => E,
257 exit: MutableRef.MutableRef<Exit.Exit<void, E> | undefined>,
258 options: FromReadableOptions | undefined
259) => {
260 if (!readable.readable) {
261 return Channel.void
262 }
263
264 const latch = Effect.unsafeMakeLatch(false)
265 function onReadable() {
266 latch.unsafeOpen()
267 }
268 function onErr(err: unknown) {
269 exit.current = Exit.fail(onError(err))
270 latch.unsafeOpen()
271 }
272 function onEnd() {
273 exit.current = Exit.void
274 latch.unsafeOpen()
275 }
276 readable.on("readable", onReadable)
277 readable.on("error", onErr)
278 readable.on("end", onEnd)
279
280 const chunkSize = options?.chunkSize ? Number(options.chunkSize) : undefined
281 const read = Channel.suspend(function loop(): Channel.Channel<Chunk.Chunk<A>, unknown, E> {
282 let item = readable.read(chunkSize) as A | null
283 if (item === null) {
284 if (exit.current) {
285 return Channel.fromEffect(exit.current)
286 }
287 latch.unsafeClose()
288 return Channel.flatMap(latch.await, loop)
289 }
290 const arr = [item as A]
291 while (true) {
292 item = readable.read(chunkSize)
293 if (item === null) {
294 return Channel.flatMap(Channel.write(Chunk.unsafeFromArray(arr)), loop)
295 }
296 arr.push(item as A)
297 }
298 })
299
300 return Channel.ensuring(
301 read,
302 Effect.sync(() => {
303 readable.off("readable", onReadable)
304 readable.off("error", onErr)
305 readable.off("end", onEnd)
306 if (options?.closeOnDone !== false && "closed" in readable && !readable.closed) {
307 readable.destroy()
308 }
309 })
310 )
311}

Callers 2

fromDuplexFunction · 0.85
fromReadableChannelFunction · 0.85

Calls 7

NumberInterface · 0.85
readMethod · 0.80
fromEffectMethod · 0.80
unsafeCloseMethod · 0.80
syncMethod · 0.80
destroyMethod · 0.80
writeMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…