MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_sqlite_write_read_round_trip

Function test_sqlite_write_read_round_trip

python/pathway/tests/test_sqlite.py:237–288  ·  view source on GitHub ↗

End-to-end round-trip: write a table covering every Pathway type to SQLite via ``pw.io.sqlite.write``, read it back through ``pw.io.sqlite.read``, and verify that both paths produce identical jsonlines — i.e. the writer/reader pair round-trips every type without loss. Exercised in bo

(
    tmp_path: pathlib.Path, serialization_tester, output_table_type
)

Source from the content-addressed store, hash-verified

235@needs_multiprocessing_fork
236@pytest.mark.parametrize("output_table_type", ["stream_of_changes", "snapshot"])
237def test_sqlite_write_read_round_trip(
238 tmp_path: pathlib.Path, serialization_tester, output_table_type
239):
240 """End-to-end round-trip: write a table covering every Pathway type to
241 SQLite via ``pw.io.sqlite.write``, read it back through
242 ``pw.io.sqlite.read``, and verify that both paths produce identical
243 jsonlines — i.e. the writer/reader pair round-trips every type
244 without loss. Exercised in both ``stream_of_changes`` and
245 ``snapshot`` output modes.
246 """
247 database_path = tmp_path / "variety.db"
248 direct_path = tmp_path / "direct.jsonl"
249 roundtrip_path = tmp_path / "roundtrip.jsonl"
250
251 # Ground truth: write the original table straight to jsonlines.
252 table, known_rows = serialization_tester.create_variety_table(with_optionals=False)
253 schema = table.schema
254 pw.io.jsonlines.write(table, direct_path)
255 run_all()
256 G.clear()
257
258 # Writer leg: same table, written through the SQLite output connector.
259 table, _ = serialization_tester.create_variety_table(with_optionals=False)
260 write_kwargs: dict = {"init_mode": "create_if_not_exists"}
261 if output_table_type == "snapshot":
262 write_kwargs["output_table_type"] = "snapshot"
263 write_kwargs["primary_key"] = [table.pkey]
264 pw.io.sqlite.write(table, database_path, "variety", **write_kwargs)
265 run_all()
266 G.clear()
267
268 # Reader leg: read the SQLite table back and re-emit jsonlines.
269 read_table = pw.io.sqlite.read(
270 database_path, "variety", schema, autocommit_duration_ms=1
271 )
272 pw.io.jsonlines.write(read_table, roundtrip_path)
273 wait_result_with_checker(
274 FileLinesNumberChecker(roundtrip_path, len(known_rows)), 30
275 )
276
277 def load(path: pathlib.Path) -> list[dict]:
278 with open(path) as f:
279 rows = [json.loads(line) for line in f]
280 # The time/diff metadata differs between runs; compare only the
281 # data columns.
282 for row in rows:
283 row.pop("time", None)
284 row.pop("diff", None)
285 rows.sort(key=lambda r: r["pkey"])
286 return rows
287
288 assert load(direct_path) == load(roundtrip_path)
289
290
291def test_sqlite_snapshot_write(tmp_path: pathlib.Path):

Callers

nothing calls this directly

Calls 7

run_allFunction · 0.90
wait_result_with_checkerFunction · 0.90
loadFunction · 0.85
create_variety_tableMethod · 0.80
writeMethod · 0.80
clearMethod · 0.80

Tested by

no test coverage detected