(tmp_path, mode)
| 729 | @only_with_license_key("mode", [api.PersistenceMode.OPERATOR_PERSISTING]) |
| 730 | @needs_multiprocessing_fork |
| 731 | def test_buffer(tmp_path, mode): |
| 732 | class InputSchema(pw.Schema): |
| 733 | t: int |
| 734 | |
| 735 | input_path = tmp_path / "1" |
| 736 | os.makedirs(input_path) |
| 737 | output_path = tmp_path / "out.csv" |
| 738 | persistent_storage_path = tmp_path / "p" |
| 739 | count = 0 |
| 740 | persistence_config = pw.persistence.Config( |
| 741 | pw.persistence.Backend.filesystem(persistent_storage_path), |
| 742 | persistence_mode=mode, |
| 743 | ) |
| 744 | |
| 745 | def wait_result(inputs: list[str], expected: set[str]) -> None: |
| 746 | nonlocal count |
| 747 | count += 1 |
| 748 | G.clear() |
| 749 | path = input_path / str(count) |
| 750 | write_lines(path, inputs) |
| 751 | t_1 = pw.io.csv.read(input_path, schema=InputSchema, mode="streaming") |
| 752 | res = t_1._buffer(pw.this.t + 10, pw.this.t) |
| 753 | pw.io.csv.write(res, output_path) |
| 754 | wait_result_with_checker( |
| 755 | get_checker(output_path, expected), |
| 756 | timeout_sec=10, |
| 757 | target=run, |
| 758 | kwargs={"persistence_config": persistence_config}, |
| 759 | ) |
| 760 | |
| 761 | wait_result(["t", "1", "3", "11"], {"1,1"}) |
| 762 | wait_result(["t", "15", "16"], {"3,1"}) |
| 763 | wait_result(["t", "6", "21"], {"6,1", "11,1"}) |
| 764 | wait_result(["t", "9", "10"], {"9,1", "10,1"}) |
| 765 | wait_result(["t", "26"], {"15,1", "16,1"}) |
| 766 | |
| 767 | |
| 768 | @pytest.mark.parametrize("mode", [api.PersistenceMode.OPERATOR_PERSISTING]) |
nothing calls this directly
no test coverage detected