MCPcopy Index your code
hub / github.com/Effect-TS/effect / chunkCoordination

Function chunkCoordination

packages/effect/test/utils/coordination.ts:16–60  ·  view source on GitHub ↗
(
  _chunks: Iterable<Chunk.Chunk<A>>
)

Source from the content-addressed store, hash-verified

14}
15
16export const chunkCoordination = <A>(
17 _chunks: Iterable<Chunk.Chunk<A>>
18): Effect.Effect<ChunkCoordination<A>> =>
19 Effect.gen(function*() {
20 const chunks = Chunk.fromIterable(_chunks)
21 const queue = yield* Queue.unbounded<Exit.Exit<Chunk.Chunk<A>, Option.Option<never>>>()
22 const ps = yield* Queue.unbounded<void>()
23 const ref = yield* Ref.make<Chunk.Chunk<Chunk.Chunk<Exit.Exit<Chunk.Chunk<A>, Option.Option<never>>>>>(
24 pipe(
25 chunks,
26 Chunk.dropRight(1),
27 Chunk.map((chunk) => Chunk.of(Exit.succeed(chunk))),
28 Chunk.appendAll(
29 pipe(
30 Chunk.last(chunks),
31 Option.map((chunk) =>
32 Chunk.unsafeFromArray<Exit.Exit<Chunk.Chunk<A>, Option.Option<never>>>([
33 Exit.succeed(chunk),
34 Exit.fail(Option.none())
35 ])
36 ),
37 Option.match({
38 onNone: () => Chunk.empty<Chunk.Chunk<Exit.Exit<Chunk.Chunk<A>, Option.Option<never>>>>(),
39 onSome: Chunk.of
40 })
41 )
42 )
43 )
44 )
45 return {
46 queue,
47 offer: pipe(
48 Ref.modify(ref, (chunk) => {
49 if (Chunk.isEmpty(chunk)) {
50 return [Chunk.empty(), Chunk.empty()]
51 }
52 return [Chunk.unsafeHead(chunk), Chunk.drop(1)(chunk)]
53 }),
54 Effect.flatMap((chunks) => Queue.offerAll(queue, chunks)),
55 Effect.asVoid
56 ),
57 proceed: pipe(Queue.offer(ps, void 0), Effect.asVoid),
58 awaitNext: Queue.take(ps)
59 }
60 })

Callers 7

throttling.test.tsFile · 0.85
grouping.test.tsFile · 0.85
halting.test.tsFile · 0.85
timeouts.test.tsFile · 0.85

Calls 11

lastMethod · 0.80
makeMethod · 0.65
mapMethod · 0.65
ofMethod · 0.65
failMethod · 0.65
modifyMethod · 0.65
isEmptyMethod · 0.65
offerAllMethod · 0.65
offerMethod · 0.65
takeMethod · 0.65
pipeFunction · 0.50

Tested by

no test coverage detected