MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_async_transformer

Function test_async_transformer

python/pathway/tests/test_persistence.py:1007–1022  ·  view source on GitHub ↗
(tmp_path, mode)

Source from the content-addressed store, hash-verified

1005@pytest.mark.parametrize("mode", [api.PersistenceMode.OPERATOR_PERSISTING])
1006@needs_multiprocessing_fork
1007def test_async_transformer(tmp_path, mode):
1008 input_path = tmp_path / "1"
1009 can_pass = [True, True, False, False, True, True, True, False, True]
1010 wait_result = get_async_transformer_tester(
1011 tmp_path, input_path, mode, False, can_pass
1012 )
1013 wait_result(["a,b", "0,0", "1,1", "2,2"], {"0,-1,1", "1,0,1"})
1014 can_pass[2] = True
1015 can_pass[3] = True
1016 wait_result(["a,b", "3,3"], {"2,1,1", "3,2,1"})
1017 wait_result(["a,b", "0,4", "2,3"], {"0,-1,-1", "2,1,-1", "0,3,1", "2,2,1"})
1018 wait_result(["a,b", "6,6", "7,7"], {"6,5,1"})
1019 os.remove(input_path / "4")
1020 wait_result(["a,b", "8,8"], {"6,5,-1", "8,7,1"})
1021 os.remove(input_path / "3")
1022 wait_result(["a,b"], {"0,3,-1", "2,2,-1"})
1023
1024
1025@pytest.mark.parametrize("mode", [api.PersistenceMode.OPERATOR_PERSISTING])

Callers

nothing calls this directly

Calls 3

wait_resultFunction · 0.85
removeMethod · 0.45

Tested by

no test coverage detected