Trace execution flows from entry points.
(
store: GraphStore,
result: dict[str, Any],
warnings: list[str],
)
| 101 | |
| 102 | |
| 103 | def _trace_flows( |
| 104 | store: GraphStore, |
| 105 | result: dict[str, Any], |
| 106 | warnings: list[str], |
| 107 | ) -> None: |
| 108 | """Trace execution flows from entry points.""" |
| 109 | try: |
| 110 | from .flows import store_flows, trace_flows |
| 111 | |
| 112 | flows = trace_flows(store) |
| 113 | count = store_flows(store, flows) |
| 114 | result["flows_detected"] = count |
| 115 | except (sqlite3.OperationalError, ImportError) as e: |
| 116 | logger.warning("Flow detection failed: %s", e) |
| 117 | warnings.append(f"Flow detection failed: {type(e).__name__}: {e}") |
| 118 | |
| 119 | |
| 120 | def _detect_communities( |
no test coverage detected