MCPcopy
hub / github.com/dlt-hub/dlt / test_save_load_empty_trace

Function test_save_load_empty_trace

tests/pipeline/test_pipeline_trace.py:506–526  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

504
505
506def test_save_load_empty_trace() -> None:
507 os.environ["COMPLETED_PROB"] = "1.0"
508 os.environ["RESTORE_FROM_DESTINATION"] = "false"
509 pipeline = dlt.pipeline()
510 pipeline.run([], table_name="data", destination="dummy")
511 trace = pipeline.last_trace
512 assert_trace_serializable(trace)
513 assert len(trace.steps) == 4
514
515 pipeline.activate()
516
517 # load trace and check if all elements are present
518 loaded_trace = load_trace(pipeline.working_dir)
519 print(loaded_trace.asstr(2))
520 assert len(trace.steps) == 4
521 loaded_trace_dict = deepcopy(loaded_trace.asdict())
522 trace_dict = deepcopy(trace.asdict())
523 assert loaded_trace_dict == trace_dict
524 # do it again to check if we are not popping
525 assert loaded_trace_dict == loaded_trace.asdict()
526 assert trace_dict == trace.asdict()
527
528
529def test_disable_trace(environment: DictStrStr) -> None:

Callers

nothing calls this directly

Calls 7

load_traceFunction · 0.90
pipelineMethod · 0.80
runMethod · 0.45
activateMethod · 0.45
asstrMethod · 0.45
asdictMethod · 0.45

Tested by

no test coverage detected