MCPcopy
hub / github.com/dlt-hub/dlt / test_run

Function test_run

tests/helpers/airflow_tests/test_airflow_wrapper.py:256–295  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

254
255
256def test_run() -> None:
257 task: BaseOperator = None
258
259 pipeline_standalone = dlt.pipeline(
260 pipeline_name="pipeline_standalone",
261 dataset_name="mock_data_" + uniq_id(),
262 destination=dlt.destinations.duckdb(credentials=":pipeline:"),
263 )
264 pipeline_standalone.run(mock_data_source())
265 pipeline_standalone_counts = load_table_counts(pipeline_standalone)
266
267 @dag(schedule=None, start_date=DEFAULT_DATE, catchup=False, default_args=default_args)
268 def dag_regular():
269 nonlocal task
270 tasks = PipelineTasksGroup(
271 "pipeline_dag_regular", local_data_folder=get_test_storage_root(), wipe_local_data=False
272 )
273
274 # set duckdb to be outside of pipeline folder which is dropped on each task
275 pipeline_dag_regular = dlt.pipeline(
276 pipeline_name="pipeline_dag_regular",
277 dataset_name="mock_data_" + uniq_id(),
278 destination=dlt.destinations.duckdb(destination_name="dag_regular_b"),
279 )
280 task = tasks.run(pipeline_dag_regular, mock_data_source())
281
282 dag_def: DAG = dag_regular()
283 assert task.task_id == "mock_data_source__r_init-_t_init_post-_t1-_t2-2-more"
284
285 dag_def.test()
286
287 pipeline_dag_regular = dlt.attach(
288 pipeline_name="pipeline_dag_regular",
289 )
290 assert pipeline_dag_regular.first_run is False
291
292 pipeline_dag_regular_counts = load_table_counts(pipeline_dag_regular)
293 assert pipeline_dag_regular_counts == pipeline_standalone_counts
294
295 assert isinstance(task, PythonOperator)
296
297
298@pytest.mark.parametrize("serialize_first_task", [True, False])

Callers

nothing calls this directly

Calls 7

uniq_idFunction · 0.90
load_table_countsFunction · 0.90
mock_data_sourceFunction · 0.85
pipelineMethod · 0.80
testMethod · 0.80
dag_regularFunction · 0.70
runMethod · 0.45

Tested by

no test coverage detected