(tmp_path: pathlib.Path)
| 24 | |
| 25 | @needs_multiprocessing_fork |
| 26 | def test_sqlite(tmp_path: pathlib.Path): |
| 27 | database_name = tmp_path / "test.db" |
| 28 | output_path = tmp_path / "output.csv" |
| 29 | |
| 30 | connection = sqlite3.connect(database_name) |
| 31 | cursor = connection.cursor() |
| 32 | cursor.execute( |
| 33 | """ |
| 34 | CREATE TABLE users ( |
| 35 | id INTEGER, |
| 36 | login TEXT, |
| 37 | name TEXT |
| 38 | ) |
| 39 | """ |
| 40 | ) |
| 41 | cursor.execute("INSERT INTO users (id, login, name) VALUES (1, 'alice', 'Alice')") |
| 42 | cursor.execute("INSERT INTO users (id, login, name) VALUES (2, 'bob1999', 'Bob')") |
| 43 | connection.commit() |
| 44 | |
| 45 | def stream_target(): |
| 46 | wait_result_with_checker(FileLinesNumberChecker(output_path, 2), 5, target=None) |
| 47 | connection = sqlite3.connect(database_name) |
| 48 | cursor = connection.cursor() |
| 49 | cursor.execute( |
| 50 | """ |
| 51 | INSERT INTO users (id, login, name) VALUES (3, 'ch123', 'Charlie')""" |
| 52 | ) |
| 53 | connection.commit() |
| 54 | |
| 55 | wait_result_with_checker(FileLinesNumberChecker(output_path, 3), 2, target=None) |
| 56 | cursor = connection.cursor() |
| 57 | cursor.execute("UPDATE users SET name = 'Bob Smith' WHERE id = 2") |
| 58 | connection.commit() |
| 59 | |
| 60 | wait_result_with_checker(FileLinesNumberChecker(output_path, 5), 2, target=None) |
| 61 | cursor = connection.cursor() |
| 62 | cursor.execute("DELETE FROM users WHERE id = 3") |
| 63 | connection.commit() |
| 64 | |
| 65 | class InputSchema(pw.Schema): |
| 66 | id: int |
| 67 | login: str |
| 68 | name: str |
| 69 | |
| 70 | table = pw.io.sqlite.read( |
| 71 | database_name, "users", InputSchema, autocommit_duration_ms=1 |
| 72 | ) |
| 73 | pw.io.jsonlines.write(table, output_path) |
| 74 | |
| 75 | inputs_thread = threading.Thread(target=stream_target, daemon=True) |
| 76 | inputs_thread.start() |
| 77 | |
| 78 | wait_result_with_checker(FileLinesNumberChecker(output_path, 6), 30) |
| 79 | |
| 80 | events = [] |
| 81 | with open(output_path) as f: |
| 82 | for row in f: |
| 83 | events.append(json.loads(row)) |
nothing calls this directly
no test coverage detected