MCPcopy Index your code
hub / github.com/ray-project/ray / SQLDatasink

Class SQLDatasink

python/ray/data/_internal/datasource/sql_datasink.py:9–35  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

7
8
9class SQLDatasink(Datasink[None]):
10
11 _MAX_ROWS_PER_WRITE = 128
12
13 def __init__(self, sql: str, connection_factory: Callable[[], Connection]):
14 self.sql = sql
15 self.connection_factory = connection_factory
16
17 def write(
18 self,
19 blocks: Iterable[Block],
20 ctx: TaskContext,
21 ) -> None:
22 with _connect(self.connection_factory) as cursor:
23 for block in blocks:
24 block_accessor = BlockAccessor.for_block(block)
25
26 values = []
27 for row in block_accessor.iter_rows(public_row_format=False):
28 values.append(tuple(row.values()))
29 assert len(values) <= self._MAX_ROWS_PER_WRITE, len(values)
30 if len(values) == self._MAX_ROWS_PER_WRITE:
31 cursor.executemany(self.sql, values)
32 values = []
33
34 if values:
35 cursor.executemany(self.sql, values)

Callers 1

write_sqlMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…