(tmp_path: pathlib.Path)
| 1619 | |
| 1620 | |
| 1621 | def test_binary_data_in_subscribe(tmp_path: pathlib.Path): |
| 1622 | input_path = tmp_path / "input.txt" |
| 1623 | input_full_contents = "abc\n\ndef\nghi" |
| 1624 | write_lines(input_path, input_full_contents) |
| 1625 | |
| 1626 | table = pw.io.fs.read( |
| 1627 | input_path, |
| 1628 | format="binary", |
| 1629 | mode="static", |
| 1630 | autocommit_duration_ms=1000, |
| 1631 | ) |
| 1632 | |
| 1633 | rows = [] |
| 1634 | |
| 1635 | def on_change(key, row, time, is_addition): |
| 1636 | rows.append(row) |
| 1637 | |
| 1638 | def on_end(*args, **kwargs): |
| 1639 | pass |
| 1640 | |
| 1641 | pw.io.subscribe(table, on_change=on_change, on_end=on_end) |
| 1642 | run() |
| 1643 | |
| 1644 | assert rows == [ |
| 1645 | { |
| 1646 | "data": (input_full_contents + "\n").encode("utf-8"), |
| 1647 | } |
| 1648 | ] |
| 1649 | |
| 1650 | |
| 1651 | def test_bool_values_parsing_in_csv(tmp_path: pathlib.Path): |
nothing calls this directly
no test coverage detected