(tmp_path, mode)
| 1025 | @pytest.mark.parametrize("mode", [api.PersistenceMode.OPERATOR_PERSISTING]) |
| 1026 | @needs_multiprocessing_fork |
| 1027 | def test_fully_async_udf(tmp_path, mode): |
| 1028 | input_path = tmp_path / "1" |
| 1029 | can_pass = [True, True, False, False, True, True, True, False, True] |
| 1030 | wait_result = get_async_transformer_tester( |
| 1031 | tmp_path, input_path, mode, True, can_pass |
| 1032 | ) |
| 1033 | wait_result(["a,b", "0,0", "1,1", "2,2"], {"0,-1,1", "1,0,1"}) |
| 1034 | can_pass[2] = True |
| 1035 | can_pass[3] = True |
| 1036 | wait_result(["a,b", "3,3"], {"2,1,1", "3,2,1"}) |
| 1037 | os.remove(input_path / "1") |
| 1038 | wait_result( |
| 1039 | ["a,b", "0,4", "2,3"], {"0,-1,-1", "2,1,-1", "0,3,1", "2,2,1", "1,0,-1"} |
| 1040 | ) |
| 1041 | wait_result(["a,b", "6,6", "7,7"], {"6,5,1"}) |
| 1042 | os.remove(input_path / "4") |
| 1043 | wait_result(["a,b", "8,8"], {"6,5,-1", "8,7,1"}) |
| 1044 | os.remove(input_path / "3") |
| 1045 | wait_result(["a,b"], {"0,3,-1", "2,2,-1"}) |
| 1046 | |
| 1047 | |
| 1048 | @pytest.mark.parametrize( |
nothing calls this directly
no test coverage detected