MCPcopy
hub / github.com/dlt-hub/dlt / test_task_already_added

Function test_task_already_added

tests/helpers/airflow_tests/test_airflow_wrapper.py:870–931  ·  view source on GitHub ↗

Test that the error 'Task id {id} has already been added to the DAG' is not happening while adding two same sources.

()

Source from the content-addressed store, hash-verified

868
869
870def test_task_already_added():
871 """
872 Test that the error 'Task id {id} has already been added to the DAG'
873 is not happening while adding two same sources.
874 """
875 tasks_list: List[BaseOperator] = None
876
877 @dag(schedule=None, start_date=pendulum.today(), catchup=False)
878 def dag_parallel():
879 nonlocal tasks_list
880 test_storage_root = get_test_storage_root()
881
882 tasks = PipelineTasksGroup(
883 "test_pipeline",
884 local_data_folder=test_storage_root,
885 wipe_local_data=False,
886 )
887
888 source = mock_data_source()
889
890 pipe = dlt.pipeline(
891 pipeline_name="test_pipeline",
892 dataset_name="mock_data",
893 destination=dlt.destinations.duckdb(
894 credentials=os.path.join(test_storage_root, "test_pipeline.duckdb")
895 ),
896 )
897 task = tasks.add_run(
898 pipe,
899 source,
900 decompose="none",
901 trigger_rule="all_done",
902 retries=0,
903 )[0]
904 assert task.task_id == "test_pipeline.mock_data_source__r_init-_t_init_post-_t1-_t2-2-more"
905
906 task = tasks.add_run(
907 pipe,
908 source,
909 decompose="none",
910 trigger_rule="all_done",
911 retries=0,
912 )[0]
913 assert (
914 task.task_id == "test_pipeline.mock_data_source__r_init-_t_init_post-_t1-_t2-2-more-2"
915 )
916
917 tasks_list = tasks.add_run(
918 pipe,
919 source,
920 decompose="none",
921 trigger_rule="all_done",
922 retries=0,
923 )
924 assert (
925 tasks_list[0].task_id
926 == "test_pipeline.mock_data_source__r_init-_t_init_post-_t1-_t2-2-more-3"
927 )

Callers

nothing calls this directly

Calls 2

dag_parallelFunction · 0.85
testMethod · 0.80

Tested by

no test coverage detected