()
| 163 | |
| 164 | |
| 165 | def test_regular_run() -> None: |
| 166 | # run the pipeline normally |
| 167 | pipeline_standalone = dlt.pipeline( |
| 168 | pipeline_name="pipeline_standalone", |
| 169 | dataset_name="mock_data_" + uniq_id(), |
| 170 | destination=dlt.destinations.duckdb(credentials=":pipeline:"), |
| 171 | ) |
| 172 | pipeline_standalone.run(mock_data_source()) |
| 173 | pipeline_standalone_counts = load_table_counts(pipeline_standalone) |
| 174 | |
| 175 | tasks_list: List[BaseOperator] = None |
| 176 | |
| 177 | @dag(schedule=None, start_date=DEFAULT_DATE, catchup=False, default_args=default_args) |
| 178 | def dag_regular(): |
| 179 | nonlocal tasks_list |
| 180 | tasks = PipelineTasksGroup( |
| 181 | "pipeline_dag_regular", local_data_folder=get_test_storage_root(), wipe_local_data=False |
| 182 | ) |
| 183 | |
| 184 | pipeline_dag_regular = dlt.pipeline( |
| 185 | pipeline_name="pipeline_dag_regular", |
| 186 | dataset_name="mock_data_" + uniq_id(), |
| 187 | destination=dlt.destinations.duckdb(credentials=":pipeline:"), |
| 188 | ) |
| 189 | assert pipeline_dag_regular.get_local_state_val("initial_cwd").startswith( |
| 190 | os.path.abspath(get_test_storage_root()) |
| 191 | ) |
| 192 | tasks_list = tasks.add_run( |
| 193 | pipeline_dag_regular, |
| 194 | mock_data_source(), |
| 195 | decompose="none", |
| 196 | trigger_rule="all_done", |
| 197 | retries=0, |
| 198 | ) |
| 199 | |
| 200 | dag_def: DAG = dag_regular() |
| 201 | assert len(tasks_list) == 1 |
| 202 | # composite task name |
| 203 | assert ( |
| 204 | tasks_list[0].task_id |
| 205 | == "pipeline_dag_regular.mock_data_source__r_init-_t_init_post-_t1-_t2-2-more" |
| 206 | ) |
| 207 | |
| 208 | dag_def.test() |
| 209 | # we should be able to attach to pipeline state created within Airflow |
| 210 | |
| 211 | pipeline_dag_regular = dlt.attach( |
| 212 | pipeline_name="pipeline_dag_regular", |
| 213 | destination=dlt.destinations.duckdb(credentials=":pipeline:"), |
| 214 | ) |
| 215 | pipeline_dag_regular_counts = load_table_counts(pipeline_dag_regular) |
| 216 | # same data should be loaded |
| 217 | assert pipeline_dag_regular_counts == pipeline_standalone_counts |
| 218 | |
| 219 | @dag(schedule=None, start_date=DEFAULT_DATE, catchup=False, default_args=default_args) |
| 220 | def dag_decomposed(): |
| 221 | nonlocal tasks_list |
| 222 | tasks = PipelineTasksGroup( |
nothing calls this directly
no test coverage detected