MCPcopy
hub / github.com/dlt-hub/dlt / pipeline_command

Function pipeline_command

dlt/_workspace/cli/_pipeline_command.py:63–456  ·  view source on GitHub ↗
(
    operation: str,
    pipeline_name: str,
    pipelines_dir: str,
    verbosity: int,
    dataset_name: str = None,
    destination: TDestinationReferenceArg = None,
    **command_kwargs: Any,
)

Source from the content-addressed store, hash-verified

61
62
63def pipeline_command(
64 operation: str,
65 pipeline_name: str,
66 pipelines_dir: str,
67 verbosity: int,
68 dataset_name: str = None,
69 destination: TDestinationReferenceArg = None,
70 **command_kwargs: Any,
71) -> None:
72 if operation == "list":
73 list_pipelines(pipelines_dir)
74 return
75
76 # we may open the dashboard for a pipeline without checking if it exists
77 if operation == "show":
78 if not utils.is_hub_available():
79 return
80
81 from dlt._workspace.helpers.dashboard.runner import run_dashboard
82
83 run_dashboard(pipeline_name, edit=command_kwargs.get("edit"), pipelines_dir=pipelines_dir)
84 return
85
86 try:
87 if verbosity > 0:
88 fmt.echo("Attaching to pipeline %s" % fmt.bold(pipeline_name))
89 p = dlt.attach(pipeline_name=pipeline_name, pipelines_dir=pipelines_dir)
90 except CannotRestorePipelineException as e:
91 if operation not in {"sync", "drop"}:
92 raise
93 fmt.warning(str(e))
94 if not fmt.confirm(
95 "Do you want to attempt to restore the pipeline state from destination?",
96 default=False,
97 ):
98 return
99 destination = destination or fmt.text_input(
100 f"Enter destination name for pipeline {fmt.bold(pipeline_name)}"
101 )
102 dataset_name = dataset_name or fmt.text_input(
103 f"Enter dataset name for pipeline {fmt.bold(pipeline_name)}"
104 )
105 p = dlt.pipeline(
106 pipeline_name,
107 pipelines_dir,
108 destination=destination,
109 dataset_name=dataset_name,
110 )
111 p.sync_destination()
112 if p.first_run:
113 # remote state was not found
114 p._wipe_working_folder()
115 fmt.error(
116 f"Pipeline {pipeline_name} was not found in dataset {dataset_name} in {destination}"
117 )
118 return
119 if operation == "sync":
120 return # No need to sync again

Callers 1

pipeline_command_wrapperFunction · 0.85

Calls 15

run_dashboardFunction · 0.90
PipelineMCPClass · 0.90
group_tables_by_resourceFunction · 0.90
resource_stateFunction · 0.90
is_complete_columnFunction · 0.90
has_table_seen_dataFunction · 0.90
remove_defaultsFunction · 0.90
pipeline_dropClass · 0.90
pipelineMethod · 0.80
sync_destinationMethod · 0.80

Tested by

no test coverage detected