()
| 410 | |
| 411 | |
| 412 | def test_save_load_trace() -> None: |
| 413 | os.environ["COMPLETED_PROB"] = "1.0" |
| 414 | # delete all traces collected before test run |
| 415 | get_resolved_traces().clear() |
| 416 | |
| 417 | info = dlt.pipeline().run([1, 2, 3], table_name="data", destination="dummy") |
| 418 | pipeline = dlt.pipeline() |
| 419 | # will get trace from working dir |
| 420 | trace = pipeline.last_trace |
| 421 | assert trace is not None |
| 422 | assert pipeline._trace is None |
| 423 | assert len(trace.steps) == 4 == len(info.pipeline.last_trace.steps) # type: ignore[attr-defined] |
| 424 | step = trace.steps[-2] # the previous to last one should be load |
| 425 | assert step.step == "load" |
| 426 | resolved = _find_resolved_value(trace.resolved_config_values, "completed_prob", []) |
| 427 | assert resolved.is_secret_hint is False |
| 428 | assert resolved.value == "1.0" |
| 429 | assert resolved.config_type_name == "DummyClientConfiguration" |
| 430 | assert_trace_serializable(trace) |
| 431 | # check row counts |
| 432 | assert pipeline.last_trace.last_normalize_info.row_counts == { |
| 433 | "_dlt_pipeline_state": 1, |
| 434 | "data": 3, |
| 435 | } |
| 436 | # reactivate the pipeline |
| 437 | pipeline.activate() |
| 438 | |
| 439 | # load trace and check if all elements are present |
| 440 | loaded_trace = load_trace(pipeline.working_dir) |
| 441 | print(loaded_trace.asstr(2)) |
| 442 | assert len(trace.steps) == 4 |
| 443 | loaded_trace_dict = deepcopy(loaded_trace.asdict()) |
| 444 | trace_dict = deepcopy(trace.asdict()) |
| 445 | assert loaded_trace_dict == trace_dict |
| 446 | # do it again to check if we are not popping |
| 447 | assert loaded_trace_dict == loaded_trace.asdict() |
| 448 | assert trace_dict == trace.asdict() |
| 449 | |
| 450 | # exception also saves trace |
| 451 | @dlt.resource |
| 452 | def data(): |
| 453 | raise NotImplementedError() |
| 454 | yield |
| 455 | |
| 456 | with pytest.raises(PipelineStepFailed) as py_ex: |
| 457 | dlt.run(data(), destination="dummy") |
| 458 | # there's the same pipeline in exception as in previous run |
| 459 | assert py_ex.value.pipeline is info.pipeline |
| 460 | trace = load_trace(py_ex.value.pipeline.working_dir) |
| 461 | assert trace is not None |
| 462 | assert pipeline._trace is None |
| 463 | assert len(trace.steps) == 2 # extract with exception, also has run with exception |
| 464 | step = trace.steps[-2] |
| 465 | assert step.step == "extract" |
| 466 | assert step.step_exception is not None |
| 467 | run_step = trace.steps[-1] |
| 468 | assert run_step.step == "run" |
| 469 | assert run_step.step_exception is not None |
nothing calls this directly
no test coverage detected