(args: argparse.Namespace, run_dir: Path, params_path: Path)
| 185 | |
| 186 | |
| 187 | def build_command(args: argparse.Namespace, run_dir: Path, params_path: Path) -> str: |
| 188 | workflow = NFCORE_PIPELINES[args.pipeline]["workflow"] |
| 189 | cmd: list[str | Path] = [ |
| 190 | "nextflow", |
| 191 | "run", |
| 192 | workflow, |
| 193 | "-params-file", |
| 194 | params_path, |
| 195 | "-work-dir", |
| 196 | run_dir / "work", |
| 197 | "-with-report", |
| 198 | run_dir / "workflow" / "nextflow_report.html", |
| 199 | "-with-timeline", |
| 200 | run_dir / "workflow" / "timeline.html", |
| 201 | "-with-trace", |
| 202 | run_dir / "workflow" / "trace.txt", |
| 203 | "-with-dag", |
| 204 | run_dir / "workflow" / "dag.html", |
| 205 | ] |
| 206 | if args.revision: |
| 207 | cmd.extend(["-r", args.revision]) |
| 208 | if args.profile: |
| 209 | cmd.extend(["-profile", args.profile]) |
| 210 | base = shell_join(cmd) |
| 211 | if args.nextflow_arg: |
| 212 | base = " ".join([base, *args.nextflow_arg]) |
| 213 | return base |
| 214 | |
| 215 | |
| 216 | def execute_command(run_dir: Path, command: str) -> dict[str, Any]: |
no test coverage detected