MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_forget_streaming

Function test_forget_streaming

python/pathway/tests/test_persistence.py:790–825  ·  view source on GitHub ↗
(tmp_path, mode)

Source from the content-addressed store, hash-verified

788@only_with_license_key("mode", [api.PersistenceMode.OPERATOR_PERSISTING])
789@needs_multiprocessing_fork
790def test_forget_streaming(tmp_path, mode):
791 class InputSchema(pw.Schema):
792 t: int
793
794 input_path = tmp_path / "1"
795 os.makedirs(input_path)
796 output_path = tmp_path / "out.csv"
797 persistent_storage_path = tmp_path / "p"
798 count = 0
799 persistence_config = pw.persistence.Config(
800 pw.persistence.Backend.filesystem(persistent_storage_path),
801 persistence_mode=mode,
802 )
803
804 def wait_result(inputs: list[str], expected: set[str]) -> None:
805 nonlocal count
806 count += 1
807 G.clear()
808 path = input_path / str(count)
809 write_lines(path, inputs)
810 t_1 = pw.io.csv.read(input_path, schema=InputSchema, mode="streaming")
811 res = t_1._forget(pw.this.t + 10, pw.this.t, mark_forgetting_records=False)
812 pw.io.csv.write(res, output_path)
813 wait_result_with_checker(
814 get_checker(output_path, expected),
815 timeout_sec=10,
816 target=run,
817 kwargs={"persistence_config": persistence_config},
818 )
819
820 wait_result(["t", "1", "3", "11"], {"1,1", "3,1", "11,1"})
821 wait_result(["t", "15", "16"], {"1,-1", "15,1", "16,1"})
822 wait_result(["t", "6", "21"], {"3,-1", "21,1"})
823 wait_result(["t", "9", "10"], {"11,-1"})
824 wait_result(["t", "26"], {"26,1"})
825 wait_result(["t", "22"], {"15,-1", "16,-1", "22,1"})
826
827
828@pytest.mark.parametrize(

Callers

nothing calls this directly

Calls 2

wait_resultFunction · 0.85
filesystemMethod · 0.80

Tested by

no test coverage detected