(changes, expected_diffs)
| 98 | events: dict = {} |
| 99 | |
| 100 | def do_run(changes, expected_diffs): |
| 101 | for eid, edata in changes.items(): |
| 102 | events[eid] = edata |
| 103 | et, flag, data = edata |
| 104 | write_lines( |
| 105 | input_path / f"{eid}.csv", |
| 106 | ["event_time,flag,data", f"{et},{flag},{data}"], |
| 107 | ) |
| 108 | |
| 109 | G.clear() |
| 110 | t = pw.io.csv.read(input_path, schema=EventSchema, mode="static") |
| 111 | res = pipeline_fn(t) |
| 112 | pw.io.csv.write(res, output_path) |
| 113 | run( |
| 114 | persistence_config=pw.persistence.Config( |
| 115 | pw.persistence.Backend.filesystem(pstorage_path), |
| 116 | persistence_mode=mode, |
| 117 | ) |
| 118 | ) |
| 119 | _assert_diffs_are_unit(output_path) |
| 120 | _assert_diffs_match(output_path, expected_diffs) |
| 121 | |
| 122 | return events, do_run |
| 123 |
no test coverage detected