(
tmp_path: pathlib.Path,
mode: api.PersistenceMode,
logic: Callable[[pw.Table, pw.Table], pw.Table],
schema: type[pw.Schema],
terminate_on_error: bool = True,
)
| 310 | |
| 311 | |
| 312 | def get_two_tables_runner( |
| 313 | tmp_path: pathlib.Path, |
| 314 | mode: api.PersistenceMode, |
| 315 | logic: Callable[[pw.Table, pw.Table], pw.Table], |
| 316 | schema: type[pw.Schema], |
| 317 | terminate_on_error: bool = True, |
| 318 | ) -> tuple[ |
| 319 | Callable[[list[str], list[str], set[str]], None], pathlib.Path, pathlib.Path |
| 320 | ]: |
| 321 | |
| 322 | input_path_1 = tmp_path / "1" |
| 323 | input_path_2 = tmp_path / "2" |
| 324 | os.makedirs(input_path_1) |
| 325 | os.makedirs(input_path_2) |
| 326 | output_path = tmp_path / "out.csv" |
| 327 | persistent_storage_path = tmp_path / "p" |
| 328 | count = 0 |
| 329 | |
| 330 | def run_computation(inputs_1, inputs_2, expected): |
| 331 | nonlocal count |
| 332 | count += 1 |
| 333 | G.clear() |
| 334 | path_1 = input_path_1 / str(count) |
| 335 | path_2 = input_path_2 / str(count) |
| 336 | write_lines(path_1, inputs_1) |
| 337 | write_lines(path_2, inputs_2) |
| 338 | t_1 = pw.io.csv.read(input_path_1, schema=schema, mode="static") |
| 339 | t_2 = pw.io.csv.read(input_path_2, schema=schema, mode="static") |
| 340 | res = logic(t_1, t_2) |
| 341 | pw.io.csv.write(res, output_path) |
| 342 | run( |
| 343 | persistence_config=pw.persistence.Config( |
| 344 | pw.persistence.Backend.filesystem(persistent_storage_path), |
| 345 | persistence_mode=mode, |
| 346 | ), |
| 347 | terminate_on_error=terminate_on_error, |
| 348 | # hack to allow changes from different files at different point in time |
| 349 | ) |
| 350 | assert_sets_equality_from_path(output_path, expected) |
| 351 | |
| 352 | return run_computation, input_path_1, input_path_2 |
| 353 | |
| 354 | |
| 355 | @pytest.mark.parametrize( |
no outgoing calls
no test coverage detected