MCPcopy Index your code
hub / github.com/Effect-TS/effect / executeStream

Method executeStream

packages/sql-pg/src/PgClient.ts:245–272  ·  view source on GitHub ↗
(
        sql: string,
        params: ReadonlyArray<unknown>,
        transformRows: (<A extends object>(row: ReadonlyArray<A>) => ReadonlyArray<A>) | undefined
      )

Source from the content-addressed store, hash-verified

243 return this.execute(sql, params, transformRows)
244 }
245 executeStream(
246 sql: string,
247 params: ReadonlyArray<unknown>,
248 transformRows: (<A extends object>(row: ReadonlyArray<A>) => ReadonlyArray<A>) | undefined
249 ) {
250 // eslint-disable-next-line @typescript-eslint/no-this-alias
251 const self = this
252 return Effect.gen(function*() {
253 const scope = yield* Effect.scope
254 const client = self.pg ?? (yield* reserveRaw)
255 yield* Scope.addFinalizer(scope, Effect.promise(() => cursor.close()))
256 const cursor = client.query(new Cursor(sql, params as any))
257 const pull = Effect.async<Chunk.Chunk<any>, Option.Option<SqlError>>((resume) => {
258 cursor.read(128, (err, rows) => {
259 if (err) {
260 resume(Effect.fail(Option.some(new SqlError({ cause: err, message: "Failed to execute statement" }))))
261 } else if (Arr.isNonEmptyArray(rows)) {
262 resume(Effect.succeed(Chunk.unsafeFromArray(transformRows ? transformRows(rows) as any : rows)))
263 } else {
264 resume(Effect.fail(Option.none()))
265 }
266 })
267 })
268 return Stream.repeatEffectChunkOption(pull)
269 }).pipe(
270 Stream.unwrapScoped
271 )
272 }
273 }
274
275 const reserveRaw = Effect.async<Pg.PoolClient, SqlError, Scope.Scope>((resume) => {

Callers 1

streamMethod · 0.45

Calls 6

readMethod · 0.80
pipeMethod · 0.65
addFinalizerMethod · 0.65
closeMethod · 0.65
failMethod · 0.65
resumeFunction · 0.50

Tested by

no test coverage detected