MCPcopy
hub / github.com/dlt-hub/dlt / test_init_all_sources_isolated

Function test_init_all_sources_isolated

tests/workspace/cli/test_init_command.py:321–348  ·  view source on GitHub ↗
(cloned_init_repo: FileStorage)

Source from the content-addressed store, hash-verified

319
320
321def test_init_all_sources_isolated(cloned_init_repo: FileStorage) -> None:
322 # Get initial repo dir for source candidates enumeration
323 repo_dir = get_repo_dir(cloned_init_repo, f"verified_sources_repo_{uniq_id()}")
324 # ensure we test both sources form verified sources and core sources
325 source_candidates = (
326 set(get_source_candidates(repo_dir)).union(set(CORE_SOURCES)).union(set(TEMPLATES))
327 )
328
329 # Clean up the initial repo dir as we don't need it anymore
330 shutil.rmtree(repo_dir, ignore_errors=True)
331
332 for candidate in source_candidates:
333 # Clean workspace by removing all files except .dlt and .global_dir
334 for item in os.listdir(os.getcwd()):
335 if item not in [".dlt", ".global_dir"]:
336 item_path = os.path.join(os.getcwd(), item)
337 if os.path.isdir(item_path):
338 shutil.rmtree(item_path, ignore_errors=True)
339 else:
340 os.remove(item_path)
341
342 repo_dir = get_repo_dir(cloned_init_repo, f"verified_sources_repo_{uniq_id()}")
343 files = get_workspace_files(clear_all_sources=False)
344 _init_command.init_command(candidate, "bigquery", repo_dir)
345 assert_source_files(files, candidate, "bigquery")
346 assert_requirements_txt(files, "bigquery")
347 if candidate not in CORE_SOURCES + TEMPLATES:
348 assert_index_version_constraint(files, candidate)
349
350
351def test_init_core_sources_ejected(cloned_init_repo: FileStorage) -> None:

Callers

nothing calls this directly

Calls 8

get_repo_dirFunction · 0.90
uniq_idFunction · 0.90
get_workspace_filesFunction · 0.90
get_source_candidatesFunction · 0.85
assert_source_filesFunction · 0.85
assert_requirements_txtFunction · 0.85
joinMethod · 0.80

Tested by

no test coverage detected