Test that the error 'Task id {id} has already been added to the DAG' is not happening while adding two same sources.
()
| 868 | |
| 869 | |
| 870 | def test_task_already_added(): |
| 871 | """ |
| 872 | Test that the error 'Task id {id} has already been added to the DAG' |
| 873 | is not happening while adding two same sources. |
| 874 | """ |
| 875 | tasks_list: List[BaseOperator] = None |
| 876 | |
| 877 | @dag(schedule=None, start_date=pendulum.today(), catchup=False) |
| 878 | def dag_parallel(): |
| 879 | nonlocal tasks_list |
| 880 | test_storage_root = get_test_storage_root() |
| 881 | |
| 882 | tasks = PipelineTasksGroup( |
| 883 | "test_pipeline", |
| 884 | local_data_folder=test_storage_root, |
| 885 | wipe_local_data=False, |
| 886 | ) |
| 887 | |
| 888 | source = mock_data_source() |
| 889 | |
| 890 | pipe = dlt.pipeline( |
| 891 | pipeline_name="test_pipeline", |
| 892 | dataset_name="mock_data", |
| 893 | destination=dlt.destinations.duckdb( |
| 894 | credentials=os.path.join(test_storage_root, "test_pipeline.duckdb") |
| 895 | ), |
| 896 | ) |
| 897 | task = tasks.add_run( |
| 898 | pipe, |
| 899 | source, |
| 900 | decompose="none", |
| 901 | trigger_rule="all_done", |
| 902 | retries=0, |
| 903 | )[0] |
| 904 | assert task.task_id == "test_pipeline.mock_data_source__r_init-_t_init_post-_t1-_t2-2-more" |
| 905 | |
| 906 | task = tasks.add_run( |
| 907 | pipe, |
| 908 | source, |
| 909 | decompose="none", |
| 910 | trigger_rule="all_done", |
| 911 | retries=0, |
| 912 | )[0] |
| 913 | assert ( |
| 914 | task.task_id == "test_pipeline.mock_data_source__r_init-_t_init_post-_t1-_t2-2-more-2" |
| 915 | ) |
| 916 | |
| 917 | tasks_list = tasks.add_run( |
| 918 | pipe, |
| 919 | source, |
| 920 | decompose="none", |
| 921 | trigger_rule="all_done", |
| 922 | retries=0, |
| 923 | ) |
| 924 | assert ( |
| 925 | tasks_list[0].task_id |
| 926 | == "test_pipeline.mock_data_source__r_init-_t_init_post-_t1-_t2-2-more-3" |
| 927 | ) |
nothing calls this directly
no test coverage detected