Show workflow details and step graph.
(
workflow_id: str = typer.Argument(..., help="Workflow ID"),
)
| 913 | |
| 914 | @workflow_app.command("info") |
| 915 | def workflow_info( |
| 916 | workflow_id: str = typer.Argument(..., help="Workflow ID"), |
| 917 | ): |
| 918 | """Show workflow details and step graph.""" |
| 919 | from .catalog import WorkflowCatalog, WorkflowRegistry, WorkflowCatalogError |
| 920 | from .engine import WorkflowEngine |
| 921 | |
| 922 | project_root = _require_specify_project() |
| 923 | |
| 924 | # Check installed first |
| 925 | registry = WorkflowRegistry(project_root) |
| 926 | installed = registry.get(workflow_id) |
| 927 | |
| 928 | engine = WorkflowEngine(project_root) |
| 929 | |
| 930 | definition = None |
| 931 | try: |
| 932 | definition = engine.load_workflow(workflow_id) |
| 933 | except FileNotFoundError: |
| 934 | # Local workflow definition not found on disk; fall back to |
| 935 | # catalog/registry lookup below. |
| 936 | pass |
| 937 | |
| 938 | if definition: |
| 939 | console.print(f"\n[bold cyan]{definition.name}[/bold cyan] ({definition.id})") |
| 940 | console.print(f" Version: {definition.version}") |
| 941 | if definition.author: |
| 942 | console.print(f" Author: {definition.author}") |
| 943 | if definition.description: |
| 944 | console.print(f" Description: {definition.description}") |
| 945 | if definition.default_integration: |
| 946 | console.print(f" Integration: {definition.default_integration}") |
| 947 | if installed: |
| 948 | console.print(" [green]Installed[/green]") |
| 949 | |
| 950 | if definition.inputs: |
| 951 | console.print("\n [bold]Inputs:[/bold]") |
| 952 | for name, inp in definition.inputs.items(): |
| 953 | if isinstance(inp, dict): |
| 954 | req = "required" if inp.get("required") else "optional" |
| 955 | console.print(f" {name} ({inp.get('type', 'string')}) — {req}") |
| 956 | |
| 957 | if definition.steps: |
| 958 | console.print(f"\n [bold]Steps ({len(definition.steps)}):[/bold]") |
| 959 | for step in definition.steps: |
| 960 | stype = step.get("type", "command") |
| 961 | console.print(f" → {step.get('id', '?')} [{stype}]") |
| 962 | return |
| 963 | |
| 964 | # Try catalog |
| 965 | catalog = WorkflowCatalog(project_root) |
| 966 | try: |
| 967 | info = catalog.get_workflow_info(workflow_id) |
| 968 | except WorkflowCatalogError: |
| 969 | info = None |
| 970 | |
| 971 | if info: |
| 972 | console.print(f"\n[bold cyan]{info.get('name', workflow_id)}[/bold cyan] ({workflow_id})") |
nothing calls this directly
no test coverage detected