MCPcopy
hub / github.com/dlt-hub/dlt / test_save_load_trace

Function test_save_load_trace

tests/pipeline/test_pipeline_trace.py:412–472  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

410
411
412def test_save_load_trace() -> None:
413 os.environ["COMPLETED_PROB"] = "1.0"
414 # delete all traces collected before test run
415 get_resolved_traces().clear()
416
417 info = dlt.pipeline().run([1, 2, 3], table_name="data", destination="dummy")
418 pipeline = dlt.pipeline()
419 # will get trace from working dir
420 trace = pipeline.last_trace
421 assert trace is not None
422 assert pipeline._trace is None
423 assert len(trace.steps) == 4 == len(info.pipeline.last_trace.steps) # type: ignore[attr-defined]
424 step = trace.steps[-2] # the previous to last one should be load
425 assert step.step == "load"
426 resolved = _find_resolved_value(trace.resolved_config_values, "completed_prob", [])
427 assert resolved.is_secret_hint is False
428 assert resolved.value == "1.0"
429 assert resolved.config_type_name == "DummyClientConfiguration"
430 assert_trace_serializable(trace)
431 # check row counts
432 assert pipeline.last_trace.last_normalize_info.row_counts == {
433 "_dlt_pipeline_state": 1,
434 "data": 3,
435 }
436 # reactivate the pipeline
437 pipeline.activate()
438
439 # load trace and check if all elements are present
440 loaded_trace = load_trace(pipeline.working_dir)
441 print(loaded_trace.asstr(2))
442 assert len(trace.steps) == 4
443 loaded_trace_dict = deepcopy(loaded_trace.asdict())
444 trace_dict = deepcopy(trace.asdict())
445 assert loaded_trace_dict == trace_dict
446 # do it again to check if we are not popping
447 assert loaded_trace_dict == loaded_trace.asdict()
448 assert trace_dict == trace.asdict()
449
450 # exception also saves trace
451 @dlt.resource
452 def data():
453 raise NotImplementedError()
454 yield
455
456 with pytest.raises(PipelineStepFailed) as py_ex:
457 dlt.run(data(), destination="dummy")
458 # there's the same pipeline in exception as in previous run
459 assert py_ex.value.pipeline is info.pipeline
460 trace = load_trace(py_ex.value.pipeline.working_dir)
461 assert trace is not None
462 assert pipeline._trace is None
463 assert len(trace.steps) == 2 # extract with exception, also has run with exception
464 step = trace.steps[-2]
465 assert step.step == "extract"
466 assert step.step_exception is not None
467 run_step = trace.steps[-1]
468 assert run_step.step == "run"
469 assert run_step.step_exception is not None

Callers

nothing calls this directly

Calls 11

get_resolved_tracesFunction · 0.90
load_traceFunction · 0.90
_find_resolved_valueFunction · 0.85
clearMethod · 0.80
pipelineMethod · 0.80
dataFunction · 0.70
runMethod · 0.45
activateMethod · 0.45
asstrMethod · 0.45
asdictMethod · 0.45

Tested by

no test coverage detected