MCPcopy
hub / github.com/dlt-hub/dlt / test_regular_run

Function test_regular_run

tests/helpers/airflow_tests/test_airflow_wrapper.py:165–253  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

163
164
165def test_regular_run() -> None:
166 # run the pipeline normally
167 pipeline_standalone = dlt.pipeline(
168 pipeline_name="pipeline_standalone",
169 dataset_name="mock_data_" + uniq_id(),
170 destination=dlt.destinations.duckdb(credentials=":pipeline:"),
171 )
172 pipeline_standalone.run(mock_data_source())
173 pipeline_standalone_counts = load_table_counts(pipeline_standalone)
174
175 tasks_list: List[BaseOperator] = None
176
177 @dag(schedule=None, start_date=DEFAULT_DATE, catchup=False, default_args=default_args)
178 def dag_regular():
179 nonlocal tasks_list
180 tasks = PipelineTasksGroup(
181 "pipeline_dag_regular", local_data_folder=get_test_storage_root(), wipe_local_data=False
182 )
183
184 pipeline_dag_regular = dlt.pipeline(
185 pipeline_name="pipeline_dag_regular",
186 dataset_name="mock_data_" + uniq_id(),
187 destination=dlt.destinations.duckdb(credentials=":pipeline:"),
188 )
189 assert pipeline_dag_regular.get_local_state_val("initial_cwd").startswith(
190 os.path.abspath(get_test_storage_root())
191 )
192 tasks_list = tasks.add_run(
193 pipeline_dag_regular,
194 mock_data_source(),
195 decompose="none",
196 trigger_rule="all_done",
197 retries=0,
198 )
199
200 dag_def: DAG = dag_regular()
201 assert len(tasks_list) == 1
202 # composite task name
203 assert (
204 tasks_list[0].task_id
205 == "pipeline_dag_regular.mock_data_source__r_init-_t_init_post-_t1-_t2-2-more"
206 )
207
208 dag_def.test()
209 # we should be able to attach to pipeline state created within Airflow
210
211 pipeline_dag_regular = dlt.attach(
212 pipeline_name="pipeline_dag_regular",
213 destination=dlt.destinations.duckdb(credentials=":pipeline:"),
214 )
215 pipeline_dag_regular_counts = load_table_counts(pipeline_dag_regular)
216 # same data should be loaded
217 assert pipeline_dag_regular_counts == pipeline_standalone_counts
218
219 @dag(schedule=None, start_date=DEFAULT_DATE, catchup=False, default_args=default_args)
220 def dag_decomposed():
221 nonlocal tasks_list
222 tasks = PipelineTasksGroup(

Callers

nothing calls this directly

Calls 8

uniq_idFunction · 0.90
load_table_countsFunction · 0.90
mock_data_sourceFunction · 0.85
dag_decomposedFunction · 0.85
pipelineMethod · 0.80
testMethod · 0.80
dag_regularFunction · 0.70
runMethod · 0.45

Tested by

no test coverage detected