(cloned_init_repo: FileStorage)
| 319 | |
| 320 | |
| 321 | def test_init_all_sources_isolated(cloned_init_repo: FileStorage) -> None: |
| 322 | # Get initial repo dir for source candidates enumeration |
| 323 | repo_dir = get_repo_dir(cloned_init_repo, f"verified_sources_repo_{uniq_id()}") |
| 324 | # ensure we test both sources form verified sources and core sources |
| 325 | source_candidates = ( |
| 326 | set(get_source_candidates(repo_dir)).union(set(CORE_SOURCES)).union(set(TEMPLATES)) |
| 327 | ) |
| 328 | |
| 329 | # Clean up the initial repo dir as we don't need it anymore |
| 330 | shutil.rmtree(repo_dir, ignore_errors=True) |
| 331 | |
| 332 | for candidate in source_candidates: |
| 333 | # Clean workspace by removing all files except .dlt and .global_dir |
| 334 | for item in os.listdir(os.getcwd()): |
| 335 | if item not in [".dlt", ".global_dir"]: |
| 336 | item_path = os.path.join(os.getcwd(), item) |
| 337 | if os.path.isdir(item_path): |
| 338 | shutil.rmtree(item_path, ignore_errors=True) |
| 339 | else: |
| 340 | os.remove(item_path) |
| 341 | |
| 342 | repo_dir = get_repo_dir(cloned_init_repo, f"verified_sources_repo_{uniq_id()}") |
| 343 | files = get_workspace_files(clear_all_sources=False) |
| 344 | _init_command.init_command(candidate, "bigquery", repo_dir) |
| 345 | assert_source_files(files, candidate, "bigquery") |
| 346 | assert_requirements_txt(files, "bigquery") |
| 347 | if candidate not in CORE_SOURCES + TEMPLATES: |
| 348 | assert_index_version_constraint(files, candidate) |
| 349 | |
| 350 | |
| 351 | def test_init_core_sources_ejected(cloned_init_repo: FileStorage) -> None: |
nothing calls this directly
no test coverage detected