| 186 | }, |
| 187 | executeStream(sql, params, transformRows) { |
| 188 | function* stream() { |
| 189 | for (const stmt of sqlite3.statements(db, sql)) { |
| 190 | let columns: Array<string> | undefined |
| 191 | sqlite3.bind_collection(stmt, params as any) |
| 192 | while (sqlite3.step(stmt) === WaSqlite.SQLITE_ROW) { |
| 193 | columns = columns ?? sqlite3.column_names(stmt) |
| 194 | const row = sqlite3.row(stmt) |
| 195 | const obj: Record<string, any> = {} |
| 196 | for (let i = 0; i < columns.length; i++) { |
| 197 | obj[columns[i]] = row[i] |
| 198 | } |
| 199 | yield obj |
| 200 | } |
| 201 | } |
| 202 | } |
| 203 | return Stream.suspend(() => Stream.fromIteratorSucceed(stream()[Symbol.iterator]())).pipe( |
| 204 | transformRows |
| 205 | ? Stream.mapChunks((chunk) => Chunk.unsafeFromArray(transformRows(Chunk.toReadonlyArray(chunk)))) |