()
| 504 | |
| 505 | |
| 506 | def test_save_load_empty_trace() -> None: |
| 507 | os.environ["COMPLETED_PROB"] = "1.0" |
| 508 | os.environ["RESTORE_FROM_DESTINATION"] = "false" |
| 509 | pipeline = dlt.pipeline() |
| 510 | pipeline.run([], table_name="data", destination="dummy") |
| 511 | trace = pipeline.last_trace |
| 512 | assert_trace_serializable(trace) |
| 513 | assert len(trace.steps) == 4 |
| 514 | |
| 515 | pipeline.activate() |
| 516 | |
| 517 | # load trace and check if all elements are present |
| 518 | loaded_trace = load_trace(pipeline.working_dir) |
| 519 | print(loaded_trace.asstr(2)) |
| 520 | assert len(trace.steps) == 4 |
| 521 | loaded_trace_dict = deepcopy(loaded_trace.asdict()) |
| 522 | trace_dict = deepcopy(trace.asdict()) |
| 523 | assert loaded_trace_dict == trace_dict |
| 524 | # do it again to check if we are not popping |
| 525 | assert loaded_trace_dict == loaded_trace.asdict() |
| 526 | assert trace_dict == trace.asdict() |
| 527 | |
| 528 | |
| 529 | def test_disable_trace(environment: DictStrStr) -> None: |
nothing calls this directly
no test coverage detected