MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_buffer

Function test_buffer

python/pathway/tests/test_persistence.py:731–765  ·  view source on GitHub ↗
(tmp_path, mode)

Source from the content-addressed store, hash-verified

729@only_with_license_key("mode", [api.PersistenceMode.OPERATOR_PERSISTING])
730@needs_multiprocessing_fork
731def test_buffer(tmp_path, mode):
732 class InputSchema(pw.Schema):
733 t: int
734
735 input_path = tmp_path / "1"
736 os.makedirs(input_path)
737 output_path = tmp_path / "out.csv"
738 persistent_storage_path = tmp_path / "p"
739 count = 0
740 persistence_config = pw.persistence.Config(
741 pw.persistence.Backend.filesystem(persistent_storage_path),
742 persistence_mode=mode,
743 )
744
745 def wait_result(inputs: list[str], expected: set[str]) -> None:
746 nonlocal count
747 count += 1
748 G.clear()
749 path = input_path / str(count)
750 write_lines(path, inputs)
751 t_1 = pw.io.csv.read(input_path, schema=InputSchema, mode="streaming")
752 res = t_1._buffer(pw.this.t + 10, pw.this.t)
753 pw.io.csv.write(res, output_path)
754 wait_result_with_checker(
755 get_checker(output_path, expected),
756 timeout_sec=10,
757 target=run,
758 kwargs={"persistence_config": persistence_config},
759 )
760
761 wait_result(["t", "1", "3", "11"], {"1,1"})
762 wait_result(["t", "15", "16"], {"3,1"})
763 wait_result(["t", "6", "21"], {"6,1", "11,1"})
764 wait_result(["t", "9", "10"], {"9,1", "10,1"})
765 wait_result(["t", "26"], {"15,1", "16,1"})
766
767
768@pytest.mark.parametrize("mode", [api.PersistenceMode.OPERATOR_PERSISTING])

Callers

nothing calls this directly

Calls 2

wait_resultFunction · 0.85
filesystemMethod · 0.80

Tested by

no test coverage detected