| 243 | return this.execute(sql, params, transformRows) |
| 244 | } |
| 245 | executeStream( |
| 246 | sql: string, |
| 247 | params: ReadonlyArray<unknown>, |
| 248 | transformRows: (<A extends object>(row: ReadonlyArray<A>) => ReadonlyArray<A>) | undefined |
| 249 | ) { |
| 250 | // eslint-disable-next-line @typescript-eslint/no-this-alias |
| 251 | const self = this |
| 252 | return Effect.gen(function*() { |
| 253 | const scope = yield* Effect.scope |
| 254 | const client = self.pg ?? (yield* reserveRaw) |
| 255 | yield* Scope.addFinalizer(scope, Effect.promise(() => cursor.close())) |
| 256 | const cursor = client.query(new Cursor(sql, params as any)) |
| 257 | const pull = Effect.async<Chunk.Chunk<any>, Option.Option<SqlError>>((resume) => { |
| 258 | cursor.read(128, (err, rows) => { |
| 259 | if (err) { |
| 260 | resume(Effect.fail(Option.some(new SqlError({ cause: err, message: "Failed to execute statement" })))) |
| 261 | } else if (Arr.isNonEmptyArray(rows)) { |
| 262 | resume(Effect.succeed(Chunk.unsafeFromArray(transformRows ? transformRows(rows) as any : rows))) |
| 263 | } else { |
| 264 | resume(Effect.fail(Option.none())) |
| 265 | } |
| 266 | }) |
| 267 | }) |
| 268 | return Stream.repeatEffectChunkOption(pull) |
| 269 | }).pipe( |
| 270 | Stream.unwrapScoped |
| 271 | ) |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | const reserveRaw = Effect.async<Pg.PoolClient, SqlError, Scope.Scope>((resume) => { |