()
| 254 | |
| 255 | |
| 256 | def test_run() -> None: |
| 257 | task: BaseOperator = None |
| 258 | |
| 259 | pipeline_standalone = dlt.pipeline( |
| 260 | pipeline_name="pipeline_standalone", |
| 261 | dataset_name="mock_data_" + uniq_id(), |
| 262 | destination=dlt.destinations.duckdb(credentials=":pipeline:"), |
| 263 | ) |
| 264 | pipeline_standalone.run(mock_data_source()) |
| 265 | pipeline_standalone_counts = load_table_counts(pipeline_standalone) |
| 266 | |
| 267 | @dag(schedule=None, start_date=DEFAULT_DATE, catchup=False, default_args=default_args) |
| 268 | def dag_regular(): |
| 269 | nonlocal task |
| 270 | tasks = PipelineTasksGroup( |
| 271 | "pipeline_dag_regular", local_data_folder=get_test_storage_root(), wipe_local_data=False |
| 272 | ) |
| 273 | |
| 274 | # set duckdb to be outside of pipeline folder which is dropped on each task |
| 275 | pipeline_dag_regular = dlt.pipeline( |
| 276 | pipeline_name="pipeline_dag_regular", |
| 277 | dataset_name="mock_data_" + uniq_id(), |
| 278 | destination=dlt.destinations.duckdb(destination_name="dag_regular_b"), |
| 279 | ) |
| 280 | task = tasks.run(pipeline_dag_regular, mock_data_source()) |
| 281 | |
| 282 | dag_def: DAG = dag_regular() |
| 283 | assert task.task_id == "mock_data_source__r_init-_t_init_post-_t1-_t2-2-more" |
| 284 | |
| 285 | dag_def.test() |
| 286 | |
| 287 | pipeline_dag_regular = dlt.attach( |
| 288 | pipeline_name="pipeline_dag_regular", |
| 289 | ) |
| 290 | assert pipeline_dag_regular.first_run is False |
| 291 | |
| 292 | pipeline_dag_regular_counts = load_table_counts(pipeline_dag_regular) |
| 293 | assert pipeline_dag_regular_counts == pipeline_standalone_counts |
| 294 | |
| 295 | assert isinstance(task, PythonOperator) |
| 296 | |
| 297 | |
| 298 | @pytest.mark.parametrize("serialize_first_task", [True, False]) |
nothing calls this directly
no test coverage detected