(tmp_path, mode)
| 788 | @only_with_license_key("mode", [api.PersistenceMode.OPERATOR_PERSISTING]) |
| 789 | @needs_multiprocessing_fork |
| 790 | def test_forget_streaming(tmp_path, mode): |
| 791 | class InputSchema(pw.Schema): |
| 792 | t: int |
| 793 | |
| 794 | input_path = tmp_path / "1" |
| 795 | os.makedirs(input_path) |
| 796 | output_path = tmp_path / "out.csv" |
| 797 | persistent_storage_path = tmp_path / "p" |
| 798 | count = 0 |
| 799 | persistence_config = pw.persistence.Config( |
| 800 | pw.persistence.Backend.filesystem(persistent_storage_path), |
| 801 | persistence_mode=mode, |
| 802 | ) |
| 803 | |
| 804 | def wait_result(inputs: list[str], expected: set[str]) -> None: |
| 805 | nonlocal count |
| 806 | count += 1 |
| 807 | G.clear() |
| 808 | path = input_path / str(count) |
| 809 | write_lines(path, inputs) |
| 810 | t_1 = pw.io.csv.read(input_path, schema=InputSchema, mode="streaming") |
| 811 | res = t_1._forget(pw.this.t + 10, pw.this.t, mark_forgetting_records=False) |
| 812 | pw.io.csv.write(res, output_path) |
| 813 | wait_result_with_checker( |
| 814 | get_checker(output_path, expected), |
| 815 | timeout_sec=10, |
| 816 | target=run, |
| 817 | kwargs={"persistence_config": persistence_config}, |
| 818 | ) |
| 819 | |
| 820 | wait_result(["t", "1", "3", "11"], {"1,1", "3,1", "11,1"}) |
| 821 | wait_result(["t", "15", "16"], {"1,-1", "15,1", "16,1"}) |
| 822 | wait_result(["t", "6", "21"], {"3,-1", "21,1"}) |
| 823 | wait_result(["t", "9", "10"], {"11,-1"}) |
| 824 | wait_result(["t", "26"], {"26,1"}) |
| 825 | wait_result(["t", "22"], {"15,-1", "16,-1", "22,1"}) |
| 826 | |
| 827 | |
| 828 | @pytest.mark.parametrize( |
nothing calls this directly
no test coverage detected