MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_sqlite_snapshot_write

Function test_sqlite_snapshot_write

python/pathway/tests/test_sqlite.py:291–339  ·  view source on GitHub ↗

Snapshot-mode writer maintains the current state of the Pathway table: insertions UPSERT on the primary key, deletions remove the matching row. After the pipeline finishes, the SQLite table contains exactly the logically-live rows — earlier values for an updated key do not leak, and

(tmp_path: pathlib.Path)

Source from the content-addressed store, hash-verified

289
290
291def test_sqlite_snapshot_write(tmp_path: pathlib.Path):
292 """Snapshot-mode writer maintains the current state of the Pathway
293 table: insertions UPSERT on the primary key, deletions remove the
294 matching row. After the pipeline finishes, the SQLite table
295 contains exactly the logically-live rows — earlier values for an
296 updated key do not leak, and retracted-only rows are gone.
297 """
298 database_path = tmp_path / "birds.db"
299
300 def payload(key: int, genus: str, epithet: str) -> bytes:
301 return json.dumps({"key": key, "genus": genus, "epithet": epithet}).encode()
302
303 class TestSubject(pw.io.python.ConnectorSubject):
304 def run(self):
305 # Initial insertions.
306 self._add(api.ref_scalar(1), payload(1, "upupa", "epops"))
307 self._add(api.ref_scalar(2), payload(2, "bubo", "scandiacus"))
308 self._add(api.ref_scalar(3), payload(3, "corvus", "corax"))
309 # Update row 1: retract the old version and insert a new one.
310 # Snapshot mode should UPSERT on primary_key=key.
311 self._remove(api.ref_scalar(1), payload(1, "upupa", "epops"))
312 self._add(api.ref_scalar(1), payload(1, "upupa", "marginata"))
313 # Delete row 3 outright.
314 self._remove(api.ref_scalar(3), payload(3, "corvus", "corax"))
315
316 class InputSchema(pw.Schema):
317 key: int
318 genus: str
319 epithet: str
320
321 table = pw.io.python.read(TestSubject(), schema=InputSchema)
322 pw.io.sqlite.write(
323 table,
324 database_path,
325 "birds",
326 output_table_type="snapshot",
327 primary_key=[table.key],
328 init_mode="create_if_not_exists",
329 )
330 run_all()
331
332 connection = sqlite3.connect(database_path)
333 rows = list(
334 connection.execute("SELECT key, genus, epithet FROM birds ORDER BY key")
335 )
336 assert rows == [
337 (1, "upupa", "marginata"),
338 (2, "bubo", "scandiacus"),
339 ]
340
341
342@pytest.mark.parametrize("init_mode", ["default", "create_if_not_exists", "replace"])

Callers

nothing calls this directly

Calls 3

run_allFunction · 0.90
writeMethod · 0.80
TestSubjectClass · 0.70

Tested by

no test coverage detected