Snapshot-mode writer maintains the current state of the Pathway table: insertions UPSERT on the primary key, deletions remove the matching row. After the pipeline finishes, the SQLite table contains exactly the logically-live rows — earlier values for an updated key do not leak, and
(tmp_path: pathlib.Path)
| 289 | |
| 290 | |
| 291 | def test_sqlite_snapshot_write(tmp_path: pathlib.Path): |
| 292 | """Snapshot-mode writer maintains the current state of the Pathway |
| 293 | table: insertions UPSERT on the primary key, deletions remove the |
| 294 | matching row. After the pipeline finishes, the SQLite table |
| 295 | contains exactly the logically-live rows — earlier values for an |
| 296 | updated key do not leak, and retracted-only rows are gone. |
| 297 | """ |
| 298 | database_path = tmp_path / "birds.db" |
| 299 | |
| 300 | def payload(key: int, genus: str, epithet: str) -> bytes: |
| 301 | return json.dumps({"key": key, "genus": genus, "epithet": epithet}).encode() |
| 302 | |
| 303 | class TestSubject(pw.io.python.ConnectorSubject): |
| 304 | def run(self): |
| 305 | # Initial insertions. |
| 306 | self._add(api.ref_scalar(1), payload(1, "upupa", "epops")) |
| 307 | self._add(api.ref_scalar(2), payload(2, "bubo", "scandiacus")) |
| 308 | self._add(api.ref_scalar(3), payload(3, "corvus", "corax")) |
| 309 | # Update row 1: retract the old version and insert a new one. |
| 310 | # Snapshot mode should UPSERT on primary_key=key. |
| 311 | self._remove(api.ref_scalar(1), payload(1, "upupa", "epops")) |
| 312 | self._add(api.ref_scalar(1), payload(1, "upupa", "marginata")) |
| 313 | # Delete row 3 outright. |
| 314 | self._remove(api.ref_scalar(3), payload(3, "corvus", "corax")) |
| 315 | |
| 316 | class InputSchema(pw.Schema): |
| 317 | key: int |
| 318 | genus: str |
| 319 | epithet: str |
| 320 | |
| 321 | table = pw.io.python.read(TestSubject(), schema=InputSchema) |
| 322 | pw.io.sqlite.write( |
| 323 | table, |
| 324 | database_path, |
| 325 | "birds", |
| 326 | output_table_type="snapshot", |
| 327 | primary_key=[table.key], |
| 328 | init_mode="create_if_not_exists", |
| 329 | ) |
| 330 | run_all() |
| 331 | |
| 332 | connection = sqlite3.connect(database_path) |
| 333 | rows = list( |
| 334 | connection.execute("SELECT key, genus, epithet FROM birds ORDER BY key") |
| 335 | ) |
| 336 | assert rows == [ |
| 337 | (1, "upupa", "marginata"), |
| 338 | (2, "bubo", "scandiacus"), |
| 339 | ] |
| 340 | |
| 341 | |
| 342 | @pytest.mark.parametrize("init_mode", ["default", "create_if_not_exists", "replace"]) |
nothing calls this directly
no test coverage detected