| 7 | |
| 8 | |
| 9 | class SQLDatasink(Datasink[None]): |
| 10 | |
| 11 | _MAX_ROWS_PER_WRITE = 128 |
| 12 | |
| 13 | def __init__(self, sql: str, connection_factory: Callable[[], Connection]): |
| 14 | self.sql = sql |
| 15 | self.connection_factory = connection_factory |
| 16 | |
| 17 | def write( |
| 18 | self, |
| 19 | blocks: Iterable[Block], |
| 20 | ctx: TaskContext, |
| 21 | ) -> None: |
| 22 | with _connect(self.connection_factory) as cursor: |
| 23 | for block in blocks: |
| 24 | block_accessor = BlockAccessor.for_block(block) |
| 25 | |
| 26 | values = [] |
| 27 | for row in block_accessor.iter_rows(public_row_format=False): |
| 28 | values.append(tuple(row.values())) |
| 29 | assert len(values) <= self._MAX_ROWS_PER_WRITE, len(values) |
| 30 | if len(values) == self._MAX_ROWS_PER_WRITE: |
| 31 | cursor.executemany(self.sql, values) |
| 32 | values = [] |
| 33 | |
| 34 | if values: |
| 35 | cursor.executemany(self.sql, values) |
no outgoing calls
no test coverage detected
searching dependent graphs…