MCPcopy
hub / github.com/Effect-TS/effect / streamEffect

Function streamEffect

packages/rpc/src/RpcServer.ts:356–403  ·  view source on GitHub ↗
(
    client: Client,
    request: Request<Rpcs>,
    stream: Stream.Stream<any, any> | Effect.Effect<Mailbox.ReadonlyMailbox<any, any>, any, Scope.Scope>
  )

Source from the content-addressed store, hash-verified

354 }
355
356 const streamEffect = (
357 client: Client,
358 request: Request<Rpcs>,
359 stream: Stream.Stream<any, any> | Effect.Effect<Mailbox.ReadonlyMailbox<any, any>, any, Scope.Scope>
360 ) => {
361 let latch = client.latches.get(request.id)
362 if (supportsAck && !latch) {
363 latch = Effect.unsafeMakeLatch(false)
364 client.latches.set(request.id, latch)
365 }
366 if (Effect.isEffect(stream)) {
367 let done = false
368 return stream.pipe(
369 Effect.flatMap((mailbox) =>
370 Effect.whileLoop({
371 while: () => !done,
372 body: constant(Effect.flatMap(mailbox.takeAll, ([chunk, done_]) => {
373 done = done_
374 if (!Chunk.isNonEmpty(chunk)) return Effect.void
375 const write = options.onFromServer({
376 _tag: "Chunk",
377 clientId: client.id,
378 requestId: request.id,
379 values: Chunk.toReadonlyArray(chunk)
380 })
381 if (!latch) return write
382 latch.unsafeClose()
383 return Effect.zipRight(write, latch.await)
384 })),
385 step: constVoid
386 })
387 ),
388 Effect.scoped
389 )
390 }
391 return Stream.runForEachChunk(stream, (chunk) => {
392 if (!Chunk.isNonEmpty(chunk)) return Effect.void
393 const write = options.onFromServer({
394 _tag: "Chunk",
395 clientId: client.id,
396 requestId: request.id,
397 values: Chunk.toReadonlyArray(chunk)
398 })
399 if (!latch) return write
400 latch.unsafeClose()
401 return Effect.zipRight(write, latch.await)
402 })
403 }
404
405 const sendDefect = (client: Client, defect: unknown) =>
406 Effect.suspend(() => {

Callers 1

handleRequestFunction · 0.85

Calls 5

constantFunction · 0.85
unsafeCloseMethod · 0.80
getMethod · 0.65
setMethod · 0.65
pipeMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…