MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_sqlite_write_init_mode

Function test_sqlite_write_init_mode

python/pathway/tests/test_sqlite.py:343–385  ·  view source on GitHub ↗

Each ``init_mode`` must pre-stage the destination table in the right shape so that a subsequent write lands in a table with exactly the expected row: * ``"default"`` — the table must already exist; we pre-create it with a value that would be visible if the writer skipped init_mode

(tmp_path: pathlib.Path, init_mode)

Source from the content-addressed store, hash-verified

341
342@pytest.mark.parametrize("init_mode", ["default", "create_if_not_exists", "replace"])
343def test_sqlite_write_init_mode(tmp_path: pathlib.Path, init_mode):
344 """Each ``init_mode`` must pre-stage the destination table in the
345 right shape so that a subsequent write lands in a table with exactly
346 the expected row:
347
348 * ``"default"`` — the table must already exist; we pre-create it with
349 a value that would be visible if the writer skipped init_mode
350 entirely, and assert it survives (single row from the writer, no
351 leftover).
352 * ``"create_if_not_exists"`` — starting from a missing table, the
353 writer creates it and inserts one row.
354 * ``"replace"`` — starting from an *existing* table containing a
355 different row, the writer drops it and inserts one fresh row; no
356 trace of the old row remains.
357 """
358 database_path = tmp_path / "init_mode.db"
359 connection = sqlite3.connect(database_path)
360 if init_mode == "default":
361 # Writer expects the table to exist; stream_of_changes also
362 # requires the time/diff metadata columns.
363 connection.execute(
364 "CREATE TABLE notes (k INTEGER, v TEXT, time INTEGER, diff INTEGER)"
365 )
366 elif init_mode == "replace":
367 # Writer should drop this pre-existing table and recreate it
368 # with a fresh schema derived from the Pathway table.
369 connection.execute("CREATE TABLE notes (stale TEXT)")
370 connection.execute("INSERT INTO notes VALUES ('old')")
371 connection.commit()
372 connection.close()
373
374 table = pw.debug.table_from_markdown(
375 """
376 k | v
377 1 | hello
378 """
379 )
380 pw.io.sqlite.write(table, database_path, "notes", init_mode=init_mode)
381 run_all()
382
383 connection = sqlite3.connect(database_path)
384 rows = list(connection.execute("SELECT k, v FROM notes"))
385 assert rows == [(1, "hello")]
386
387
388def test_sqlite_table_name_with_special_characters(tmp_path: pathlib.Path):

Callers

nothing calls this directly

Calls 5

run_allFunction · 0.90
commitMethod · 0.80
table_from_markdownMethod · 0.80
writeMethod · 0.80
closeMethod · 0.45

Tested by

no test coverage detected