()
| 989 | |
| 990 | |
| 991 | def test_on_before_run() -> None: |
| 992 | quackdb_path = os.path.join(get_test_storage_root(), "callable_dag.duckdb") |
| 993 | |
| 994 | @dag(schedule=None, start_date=DEFAULT_DATE, catchup=False, default_args=default_args) |
| 995 | def dag_regular(): |
| 996 | tasks = PipelineTasksGroup( |
| 997 | "callable_dag_group", local_data_folder=get_test_storage_root(), wipe_local_data=False |
| 998 | ) |
| 999 | |
| 1000 | call_dag = dlt.pipeline( |
| 1001 | pipeline_name="callable_dag", |
| 1002 | dataset_name="mock_data_" + uniq_id(), |
| 1003 | destination=dlt.destinations.duckdb(credentials=quackdb_path), |
| 1004 | ) |
| 1005 | tasks.run(call_dag, mock_data_source, on_before_run=on_before_run) |
| 1006 | |
| 1007 | dag_def: DAG = dag_regular() |
| 1008 | |
| 1009 | with mock.patch("dlt.helpers.airflow_helper.logger.info") as logger_mock: |
| 1010 | dag_def.test() |
| 1011 | logger_mock.assert_has_calls( |
| 1012 | [ |
| 1013 | mock.call(f'on_before_run test: {pendulum.tomorrow().format("YYYY-MM-DD")}'), |
| 1014 | ] |
| 1015 | ) |
| 1016 | |
| 1017 | |
| 1018 | def test_pipeline_created_before_task_group_raises() -> None: |
nothing calls this directly
no test coverage detected