MCPcopy Index your code
hub / github.com/pathwaycom/pathway / get_two_tables_runner

Function get_two_tables_runner

python/pathway/tests/test_persistence.py:312–352  ·  view source on GitHub ↗
(
    tmp_path: pathlib.Path,
    mode: api.PersistenceMode,
    logic: Callable[[pw.Table, pw.Table], pw.Table],
    schema: type[pw.Schema],
    terminate_on_error: bool = True,
)

Source from the content-addressed store, hash-verified

310
311
312def get_two_tables_runner(
313 tmp_path: pathlib.Path,
314 mode: api.PersistenceMode,
315 logic: Callable[[pw.Table, pw.Table], pw.Table],
316 schema: type[pw.Schema],
317 terminate_on_error: bool = True,
318) -> tuple[
319 Callable[[list[str], list[str], set[str]], None], pathlib.Path, pathlib.Path
320]:
321
322 input_path_1 = tmp_path / "1"
323 input_path_2 = tmp_path / "2"
324 os.makedirs(input_path_1)
325 os.makedirs(input_path_2)
326 output_path = tmp_path / "out.csv"
327 persistent_storage_path = tmp_path / "p"
328 count = 0
329
330 def run_computation(inputs_1, inputs_2, expected):
331 nonlocal count
332 count += 1
333 G.clear()
334 path_1 = input_path_1 / str(count)
335 path_2 = input_path_2 / str(count)
336 write_lines(path_1, inputs_1)
337 write_lines(path_2, inputs_2)
338 t_1 = pw.io.csv.read(input_path_1, schema=schema, mode="static")
339 t_2 = pw.io.csv.read(input_path_2, schema=schema, mode="static")
340 res = logic(t_1, t_2)
341 pw.io.csv.write(res, output_path)
342 run(
343 persistence_config=pw.persistence.Config(
344 pw.persistence.Backend.filesystem(persistent_storage_path),
345 persistence_mode=mode,
346 ),
347 terminate_on_error=terminate_on_error,
348 # hack to allow changes from different files at different point in time
349 )
350 assert_sets_equality_from_path(output_path, expected)
351
352 return run_computation, input_path_1, input_path_2
353
354
355@pytest.mark.parametrize(

Callers 8

test_restrictFunction · 0.85
test_with_universe_ofFunction · 0.85
test_intersectFunction · 0.85
test_differenceFunction · 0.85
test_update_rowsFunction · 0.85
test_update_cellsFunction · 0.85
test_joinFunction · 0.85
test_from_streamsFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected