MCPcopy Index your code
hub / github.com/Effect-TS/effect / parseStream

Function parseStream

packages/rpc/src/RpcSchema.ts:97–121  ·  view source on GitHub ↗
(
  decodeSuccess: (
    u: Chunk.Chunk<unknown>,
    overrideOptions?: AST.ParseOptions
  ) => Effect.Effect<Chunk.Chunk<A>, ParseResult.ParseIssue, RA>,
  decodeFailure: (u: unknown, overrideOptions?: AST.ParseOptions) => Effect.Effect<E, ParseResult.ParseIssue, RE>
)

Source from the content-addressed store, hash-verified

95const isStream = (u: unknown): u is Stream_.Stream<unknown, unknown> => hasProperty(u, Stream_.StreamTypeId)
96
97const parseStream = <A, E, RA, RE>(
98 decodeSuccess: (
99 u: Chunk.Chunk<unknown>,
100 overrideOptions?: AST.ParseOptions
101 ) => Effect.Effect<Chunk.Chunk<A>, ParseResult.ParseIssue, RA>,
102 decodeFailure: (u: unknown, overrideOptions?: AST.ParseOptions) => Effect.Effect<E, ParseResult.ParseIssue, RE>
103) =>
104(u: unknown, options: AST.ParseOptions, ast: AST.AST) =>
105 Effect.flatMap(
106 Effect.context<RA | RE>(),
107 (context) => {
108 if (!isStream(u)) return Effect.fail(new ParseResult.Type(ast, u))
109 return Effect.succeed(u.pipe(
110 Stream_.mapChunksEffect((value) => decodeSuccess(value, options)),
111 Stream_.catchAll((error) => {
112 if (ParseResult.isParseError(error)) return Stream_.die(error)
113 return Effect.matchEffect(decodeFailure(error, options), {
114 onFailure: Effect.die,
115 onSuccess: Effect.fail
116 })
117 }),
118 Stream_.provideContext(context)
119 ))
120 }
121 )

Callers 1

StreamFunction · 0.85

Calls 6

isStreamFunction · 0.70
contextMethod · 0.65
failMethod · 0.65
pipeMethod · 0.65
dieMethod · 0.65
decodeSuccessFunction · 0.50

Tested by

no test coverage detected