MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_fully_async_udf

Function test_fully_async_udf

python/pathway/tests/test_persistence.py:1027–1045  ·  view source on GitHub ↗
(tmp_path, mode)

Source from the content-addressed store, hash-verified

1025@pytest.mark.parametrize("mode", [api.PersistenceMode.OPERATOR_PERSISTING])
1026@needs_multiprocessing_fork
1027def test_fully_async_udf(tmp_path, mode):
1028 input_path = tmp_path / "1"
1029 can_pass = [True, True, False, False, True, True, True, False, True]
1030 wait_result = get_async_transformer_tester(
1031 tmp_path, input_path, mode, True, can_pass
1032 )
1033 wait_result(["a,b", "0,0", "1,1", "2,2"], {"0,-1,1", "1,0,1"})
1034 can_pass[2] = True
1035 can_pass[3] = True
1036 wait_result(["a,b", "3,3"], {"2,1,1", "3,2,1"})
1037 os.remove(input_path / "1")
1038 wait_result(
1039 ["a,b", "0,4", "2,3"], {"0,-1,-1", "2,1,-1", "0,3,1", "2,2,1", "1,0,-1"}
1040 )
1041 wait_result(["a,b", "6,6", "7,7"], {"6,5,1"})
1042 os.remove(input_path / "4")
1043 wait_result(["a,b", "8,8"], {"6,5,-1", "8,7,1"})
1044 os.remove(input_path / "3")
1045 wait_result(["a,b"], {"0,3,-1", "2,2,-1"})
1046
1047
1048@pytest.mark.parametrize(

Callers

nothing calls this directly

Calls 3

wait_resultFunction · 0.85
removeMethod · 0.45

Tested by

no test coverage detected