MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_sqlite

Function test_sqlite

python/pathway/tests/test_sqlite.py:26–97  ·  view source on GitHub ↗
(tmp_path: pathlib.Path)

Source from the content-addressed store, hash-verified

24
25@needs_multiprocessing_fork
26def test_sqlite(tmp_path: pathlib.Path):
27 database_name = tmp_path / "test.db"
28 output_path = tmp_path / "output.csv"
29
30 connection = sqlite3.connect(database_name)
31 cursor = connection.cursor()
32 cursor.execute(
33 """
34 CREATE TABLE users (
35 id INTEGER,
36 login TEXT,
37 name TEXT
38 )
39 """
40 )
41 cursor.execute("INSERT INTO users (id, login, name) VALUES (1, 'alice', 'Alice')")
42 cursor.execute("INSERT INTO users (id, login, name) VALUES (2, 'bob1999', 'Bob')")
43 connection.commit()
44
45 def stream_target():
46 wait_result_with_checker(FileLinesNumberChecker(output_path, 2), 5, target=None)
47 connection = sqlite3.connect(database_name)
48 cursor = connection.cursor()
49 cursor.execute(
50 """
51 INSERT INTO users (id, login, name) VALUES (3, 'ch123', 'Charlie')"""
52 )
53 connection.commit()
54
55 wait_result_with_checker(FileLinesNumberChecker(output_path, 3), 2, target=None)
56 cursor = connection.cursor()
57 cursor.execute("UPDATE users SET name = 'Bob Smith' WHERE id = 2")
58 connection.commit()
59
60 wait_result_with_checker(FileLinesNumberChecker(output_path, 5), 2, target=None)
61 cursor = connection.cursor()
62 cursor.execute("DELETE FROM users WHERE id = 3")
63 connection.commit()
64
65 class InputSchema(pw.Schema):
66 id: int
67 login: str
68 name: str
69
70 table = pw.io.sqlite.read(
71 database_name, "users", InputSchema, autocommit_duration_ms=1
72 )
73 pw.io.jsonlines.write(table, output_path)
74
75 inputs_thread = threading.Thread(target=stream_target, daemon=True)
76 inputs_thread.start()
77
78 wait_result_with_checker(FileLinesNumberChecker(output_path, 6), 30)
79
80 events = []
81 with open(output_path) as f:
82 for row in f:
83 events.append(json.loads(row))

Callers

nothing calls this directly

Calls 7

wait_result_with_checkerFunction · 0.90
commitMethod · 0.80
writeMethod · 0.80
sortMethod · 0.80
startMethod · 0.45
loadsMethod · 0.45

Tested by

no test coverage detected