End-to-end round-trip: write a table covering every Pathway type to SQLite via ``pw.io.sqlite.write``, read it back through ``pw.io.sqlite.read``, and verify that both paths produce identical jsonlines — i.e. the writer/reader pair round-trips every type without loss. Exercised in bo
(
tmp_path: pathlib.Path, serialization_tester, output_table_type
)
| 235 | @needs_multiprocessing_fork |
| 236 | @pytest.mark.parametrize("output_table_type", ["stream_of_changes", "snapshot"]) |
| 237 | def test_sqlite_write_read_round_trip( |
| 238 | tmp_path: pathlib.Path, serialization_tester, output_table_type |
| 239 | ): |
| 240 | """End-to-end round-trip: write a table covering every Pathway type to |
| 241 | SQLite via ``pw.io.sqlite.write``, read it back through |
| 242 | ``pw.io.sqlite.read``, and verify that both paths produce identical |
| 243 | jsonlines — i.e. the writer/reader pair round-trips every type |
| 244 | without loss. Exercised in both ``stream_of_changes`` and |
| 245 | ``snapshot`` output modes. |
| 246 | """ |
| 247 | database_path = tmp_path / "variety.db" |
| 248 | direct_path = tmp_path / "direct.jsonl" |
| 249 | roundtrip_path = tmp_path / "roundtrip.jsonl" |
| 250 | |
| 251 | # Ground truth: write the original table straight to jsonlines. |
| 252 | table, known_rows = serialization_tester.create_variety_table(with_optionals=False) |
| 253 | schema = table.schema |
| 254 | pw.io.jsonlines.write(table, direct_path) |
| 255 | run_all() |
| 256 | G.clear() |
| 257 | |
| 258 | # Writer leg: same table, written through the SQLite output connector. |
| 259 | table, _ = serialization_tester.create_variety_table(with_optionals=False) |
| 260 | write_kwargs: dict = {"init_mode": "create_if_not_exists"} |
| 261 | if output_table_type == "snapshot": |
| 262 | write_kwargs["output_table_type"] = "snapshot" |
| 263 | write_kwargs["primary_key"] = [table.pkey] |
| 264 | pw.io.sqlite.write(table, database_path, "variety", **write_kwargs) |
| 265 | run_all() |
| 266 | G.clear() |
| 267 | |
| 268 | # Reader leg: read the SQLite table back and re-emit jsonlines. |
| 269 | read_table = pw.io.sqlite.read( |
| 270 | database_path, "variety", schema, autocommit_duration_ms=1 |
| 271 | ) |
| 272 | pw.io.jsonlines.write(read_table, roundtrip_path) |
| 273 | wait_result_with_checker( |
| 274 | FileLinesNumberChecker(roundtrip_path, len(known_rows)), 30 |
| 275 | ) |
| 276 | |
| 277 | def load(path: pathlib.Path) -> list[dict]: |
| 278 | with open(path) as f: |
| 279 | rows = [json.loads(line) for line in f] |
| 280 | # The time/diff metadata differs between runs; compare only the |
| 281 | # data columns. |
| 282 | for row in rows: |
| 283 | row.pop("time", None) |
| 284 | row.pop("diff", None) |
| 285 | rows.sort(key=lambda r: r["pkey"]) |
| 286 | return rows |
| 287 | |
| 288 | assert load(direct_path) == load(roundtrip_path) |
| 289 | |
| 290 | |
| 291 | def test_sqlite_snapshot_write(tmp_path: pathlib.Path): |
nothing calls this directly
no test coverage detected