(
operation: str,
pipeline_name: str,
pipelines_dir: str,
verbosity: int,
dataset_name: str = None,
destination: TDestinationReferenceArg = None,
**command_kwargs: Any,
)
| 61 | |
| 62 | |
| 63 | def pipeline_command( |
| 64 | operation: str, |
| 65 | pipeline_name: str, |
| 66 | pipelines_dir: str, |
| 67 | verbosity: int, |
| 68 | dataset_name: str = None, |
| 69 | destination: TDestinationReferenceArg = None, |
| 70 | **command_kwargs: Any, |
| 71 | ) -> None: |
| 72 | if operation == "list": |
| 73 | list_pipelines(pipelines_dir) |
| 74 | return |
| 75 | |
| 76 | # we may open the dashboard for a pipeline without checking if it exists |
| 77 | if operation == "show": |
| 78 | if not utils.is_hub_available(): |
| 79 | return |
| 80 | |
| 81 | from dlt._workspace.helpers.dashboard.runner import run_dashboard |
| 82 | |
| 83 | run_dashboard(pipeline_name, edit=command_kwargs.get("edit"), pipelines_dir=pipelines_dir) |
| 84 | return |
| 85 | |
| 86 | try: |
| 87 | if verbosity > 0: |
| 88 | fmt.echo("Attaching to pipeline %s" % fmt.bold(pipeline_name)) |
| 89 | p = dlt.attach(pipeline_name=pipeline_name, pipelines_dir=pipelines_dir) |
| 90 | except CannotRestorePipelineException as e: |
| 91 | if operation not in {"sync", "drop"}: |
| 92 | raise |
| 93 | fmt.warning(str(e)) |
| 94 | if not fmt.confirm( |
| 95 | "Do you want to attempt to restore the pipeline state from destination?", |
| 96 | default=False, |
| 97 | ): |
| 98 | return |
| 99 | destination = destination or fmt.text_input( |
| 100 | f"Enter destination name for pipeline {fmt.bold(pipeline_name)}" |
| 101 | ) |
| 102 | dataset_name = dataset_name or fmt.text_input( |
| 103 | f"Enter dataset name for pipeline {fmt.bold(pipeline_name)}" |
| 104 | ) |
| 105 | p = dlt.pipeline( |
| 106 | pipeline_name, |
| 107 | pipelines_dir, |
| 108 | destination=destination, |
| 109 | dataset_name=dataset_name, |
| 110 | ) |
| 111 | p.sync_destination() |
| 112 | if p.first_run: |
| 113 | # remote state was not found |
| 114 | p._wipe_working_folder() |
| 115 | fmt.error( |
| 116 | f"Pipeline {pipeline_name} was not found in dataset {dataset_name} in {destination}" |
| 117 | ) |
| 118 | return |
| 119 | if operation == "sync": |
| 120 | return # No need to sync again |
no test coverage detected