MCPcopy
hub / github.com/StructuredLabs/preswald / WorkflowAnalyzer

Class WorkflowAnalyzer

preswald/interfaces/workflow.py:604–851  ·  view source on GitHub ↗

Provides visualization and analysis capabilities for workflow structures. Uses Plotly for interactive visualization and NetworkX for graph algorithms.

Source from the content-addressed store, hash-verified

602
603
604class WorkflowAnalyzer:
605 """
606 Provides visualization and analysis capabilities for workflow structures.
607 Uses Plotly for interactive visualization and NetworkX for graph algorithms.
608 """
609
610 def __init__(self, workflow):
611 self.workflow = workflow
612 self.graph = nx.DiGraph()
613 self._last_analysis_time = None
614
615 # Define color scheme for different atom statuses
616 self.status_colors = {
617 "pending": "#E8E8E8", # Light Gray
618 "running": "#72B0DD", # Blue
619 "completed": "#72B7B7", # Teal
620 "failed": "#B76E79", # Rose
621 "retry": "#FFB347", # Orange
622 "skipped": "#D7BDE2", # Light Purple
623 "not_executed": "#C8C8C8", # Gray
624 }
625
626 def build_graph(self) -> nx.DiGraph:
627 """
628 Constructs a NetworkX graph representation of the workflow.
629 Includes rich metadata for visualization and analysis.
630 """
631 self.graph.clear()
632
633 # Add nodes (atoms) with their metadata
634 for atom_name, atom in self.workflow.atoms.items():
635 result = self.workflow.context.results.get(atom_name)
636
637 # Prepare node metadata with rich information for tooltips
638 node_data = {
639 "name": atom_name,
640 "status": result.status.value if result else "not_executed",
641 "execution_time": (
642 f"{result.execution_time:.2f}s"
643 if result and result.execution_time
644 else "N/A"
645 ),
646 "attempts": result.attempts if result else 0,
647 "error": str(result.error) if result and result.error else None,
648 "dependencies": list(atom.dependencies),
649 "force_recompute": atom.force_recompute,
650 }
651
652 self.graph.add_node(atom_name, **node_data)
653
654 # Add edges for dependencies
655 for dep in atom.dependencies:
656 self.graph.add_edge(dep, atom_name)
657
658 self._last_analysis_time = datetime.now()
659 return self.graph
660
661 def get_critical_path(self) -> list[str]:

Callers 2

workflow_analyzer_demoFunction · 0.90
workflow_dagFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected