Provides visualization and analysis capabilities for workflow structures. Uses Plotly for interactive visualization and NetworkX for graph algorithms.
| 602 | |
| 603 | |
| 604 | class WorkflowAnalyzer: |
| 605 | """ |
| 606 | Provides visualization and analysis capabilities for workflow structures. |
| 607 | Uses Plotly for interactive visualization and NetworkX for graph algorithms. |
| 608 | """ |
| 609 | |
| 610 | def __init__(self, workflow): |
| 611 | self.workflow = workflow |
| 612 | self.graph = nx.DiGraph() |
| 613 | self._last_analysis_time = None |
| 614 | |
| 615 | # Define color scheme for different atom statuses |
| 616 | self.status_colors = { |
| 617 | "pending": "#E8E8E8", # Light Gray |
| 618 | "running": "#72B0DD", # Blue |
| 619 | "completed": "#72B7B7", # Teal |
| 620 | "failed": "#B76E79", # Rose |
| 621 | "retry": "#FFB347", # Orange |
| 622 | "skipped": "#D7BDE2", # Light Purple |
| 623 | "not_executed": "#C8C8C8", # Gray |
| 624 | } |
| 625 | |
| 626 | def build_graph(self) -> nx.DiGraph: |
| 627 | """ |
| 628 | Constructs a NetworkX graph representation of the workflow. |
| 629 | Includes rich metadata for visualization and analysis. |
| 630 | """ |
| 631 | self.graph.clear() |
| 632 | |
| 633 | # Add nodes (atoms) with their metadata |
| 634 | for atom_name, atom in self.workflow.atoms.items(): |
| 635 | result = self.workflow.context.results.get(atom_name) |
| 636 | |
| 637 | # Prepare node metadata with rich information for tooltips |
| 638 | node_data = { |
| 639 | "name": atom_name, |
| 640 | "status": result.status.value if result else "not_executed", |
| 641 | "execution_time": ( |
| 642 | f"{result.execution_time:.2f}s" |
| 643 | if result and result.execution_time |
| 644 | else "N/A" |
| 645 | ), |
| 646 | "attempts": result.attempts if result else 0, |
| 647 | "error": str(result.error) if result and result.error else None, |
| 648 | "dependencies": list(atom.dependencies), |
| 649 | "force_recompute": atom.force_recompute, |
| 650 | } |
| 651 | |
| 652 | self.graph.add_node(atom_name, **node_data) |
| 653 | |
| 654 | # Add edges for dependencies |
| 655 | for dep in atom.dependencies: |
| 656 | self.graph.add_edge(dep, atom_name) |
| 657 | |
| 658 | self._last_analysis_time = datetime.now() |
| 659 | return self.graph |
| 660 | |
| 661 | def get_critical_path(self) -> list[str]: |
no outgoing calls
no test coverage detected