(tmp_path: pathlib.Path)
| 1074 | |
| 1075 | |
| 1076 | def test_python_connector_persistence(tmp_path: pathlib.Path): |
| 1077 | persistent_storage_path = tmp_path / "PStorage" |
| 1078 | input_path = tmp_path / "input.txt" |
| 1079 | output_path = tmp_path / "output.txt" |
| 1080 | |
| 1081 | class TestSubject(pw.io.python.ConnectorSubject): |
| 1082 | def __init__(self, items): |
| 1083 | super().__init__() |
| 1084 | self.items = items |
| 1085 | |
| 1086 | def run(self): |
| 1087 | for item in self.items: |
| 1088 | self.next_str(item) |
| 1089 | |
| 1090 | def run_computation(py_connector_input, fs_connector_input): |
| 1091 | G.clear() |
| 1092 | write_lines(input_path, "\n".join(fs_connector_input)) |
| 1093 | table_py = pw.io.python.read( |
| 1094 | TestSubject(py_connector_input), format="raw", name="1" |
| 1095 | ) |
| 1096 | table_csv = pw.io.plaintext.read(input_path, name="2", mode="static") |
| 1097 | table_joined = table_py.join(table_csv, table_py.data == table_csv.data).select( |
| 1098 | table_py.data |
| 1099 | ) |
| 1100 | pw.io.csv.write(table_joined, output_path) |
| 1101 | run( |
| 1102 | persistence_config=pw.persistence.Config( |
| 1103 | pw.persistence.Backend.filesystem(persistent_storage_path), |
| 1104 | ) |
| 1105 | ) |
| 1106 | |
| 1107 | # We have "one" in Python connector and "one" in plaintext connector |
| 1108 | # They will form this one pair. |
| 1109 | run_computation(["one", "two"], ["one", "three"]) |
| 1110 | result = pd.read_csv(output_path) |
| 1111 | assert set(result["data"]) == {"one"} |
| 1112 | |
| 1113 | # In case of non-persistent run we have an empty table on the left and four-element |
| 1114 | # table on the right. They join can't form any pairs in this case. |
| 1115 | # |
| 1116 | # But we are running in persistent mode and the table ["one", "two"] from the |
| 1117 | # previous run is persisted. We also have the first two elements of the plaintext |
| 1118 | # table persisted, so in this run we will do the join of ["one", "two"] (persisted |
| 1119 | # Python) with ["two", "four"] (new plaintext). It will produce one line: "two". |
| 1120 | run_computation([], ["one", "three", "two", "four"]) |
| 1121 | result = pd.read_csv(output_path) |
| 1122 | assert set(result["data"]) == {"two"} |
| 1123 | |
| 1124 | # Now we add two additional elements to the Python-connector table. Now the table in |
| 1125 | # connector is ["one", "two", "three", "four"], where ["one", "two"] are old |
| 1126 | # persisted elements, while ["three", "four"] are new. |
| 1127 | # |
| 1128 | # We don't have any new elements in the second, plaintext table, but since it's |
| 1129 | # persisted, and join has the new results, it will produce ["three", "four"] on the |
| 1130 | # output. |
| 1131 | run_computation(["three", "four"], ["one", "three", "two", "four"]) |
| 1132 | result = pd.read_csv(output_path) |
| 1133 | assert set(result["data"]) == {"three", "four"} |
nothing calls this directly
no test coverage detected