MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_python_connector_persistence

Function test_python_connector_persistence

python/pathway/tests/test_io.py:1076–1133  ·  view source on GitHub ↗
(tmp_path: pathlib.Path)

Source from the content-addressed store, hash-verified

1074
1075
1076def test_python_connector_persistence(tmp_path: pathlib.Path):
1077 persistent_storage_path = tmp_path / "PStorage"
1078 input_path = tmp_path / "input.txt"
1079 output_path = tmp_path / "output.txt"
1080
1081 class TestSubject(pw.io.python.ConnectorSubject):
1082 def __init__(self, items):
1083 super().__init__()
1084 self.items = items
1085
1086 def run(self):
1087 for item in self.items:
1088 self.next_str(item)
1089
1090 def run_computation(py_connector_input, fs_connector_input):
1091 G.clear()
1092 write_lines(input_path, "\n".join(fs_connector_input))
1093 table_py = pw.io.python.read(
1094 TestSubject(py_connector_input), format="raw", name="1"
1095 )
1096 table_csv = pw.io.plaintext.read(input_path, name="2", mode="static")
1097 table_joined = table_py.join(table_csv, table_py.data == table_csv.data).select(
1098 table_py.data
1099 )
1100 pw.io.csv.write(table_joined, output_path)
1101 run(
1102 persistence_config=pw.persistence.Config(
1103 pw.persistence.Backend.filesystem(persistent_storage_path),
1104 )
1105 )
1106
1107 # We have "one" in Python connector and "one" in plaintext connector
1108 # They will form this one pair.
1109 run_computation(["one", "two"], ["one", "three"])
1110 result = pd.read_csv(output_path)
1111 assert set(result["data"]) == {"one"}
1112
1113 # In case of non-persistent run we have an empty table on the left and four-element
1114 # table on the right. They join can't form any pairs in this case.
1115 #
1116 # But we are running in persistent mode and the table ["one", "two"] from the
1117 # previous run is persisted. We also have the first two elements of the plaintext
1118 # table persisted, so in this run we will do the join of ["one", "two"] (persisted
1119 # Python) with ["two", "four"] (new plaintext). It will produce one line: "two".
1120 run_computation([], ["one", "three", "two", "four"])
1121 result = pd.read_csv(output_path)
1122 assert set(result["data"]) == {"two"}
1123
1124 # Now we add two additional elements to the Python-connector table. Now the table in
1125 # connector is ["one", "two", "three", "four"], where ["one", "two"] are old
1126 # persisted elements, while ["three", "four"] are new.
1127 #
1128 # We don't have any new elements in the second, plaintext table, but since it's
1129 # persisted, and join has the new results, it will produce ["three", "four"] on the
1130 # output.
1131 run_computation(["three", "four"], ["one", "three", "two", "four"])
1132 result = pd.read_csv(output_path)
1133 assert set(result["data"]) == {"three", "four"}

Callers

nothing calls this directly

Calls 1

run_computationFunction · 0.70

Tested by

no test coverage detected