Trace execution flows from every entry point via forward BFS. Returns a list of flow dicts, each containing: - name: human-readable flow name (entry point name) - entry_point: qualified name of the entry point - entry_point_id: node database id of the entry point - path:
(
store: GraphStore,
max_depth: int = 15,
include_tests: bool = False,
)
| 266 | |
| 267 | |
| 268 | def trace_flows( |
| 269 | store: GraphStore, |
| 270 | max_depth: int = 15, |
| 271 | include_tests: bool = False, |
| 272 | ) -> list[dict]: |
| 273 | """Trace execution flows from every entry point via forward BFS. |
| 274 | |
| 275 | Returns a list of flow dicts, each containing: |
| 276 | - name: human-readable flow name (entry point name) |
| 277 | - entry_point: qualified name of the entry point |
| 278 | - entry_point_id: node database id of the entry point |
| 279 | - path: ordered list of node IDs in the flow |
| 280 | - depth: maximum BFS depth reached |
| 281 | - node_count: number of distinct nodes in the path |
| 282 | - file_count: number of distinct files touched |
| 283 | - files: list of distinct file paths |
| 284 | - criticality: computed criticality score (0.0-1.0) |
| 285 | """ |
| 286 | entry_points = detect_entry_points(store, include_tests=include_tests) |
| 287 | if not entry_points: |
| 288 | return [] |
| 289 | |
| 290 | adj = store.load_flow_adjacency() |
| 291 | flows: list[dict] = [] |
| 292 | |
| 293 | for ep in entry_points: |
| 294 | flow = _trace_single_flow(adj, ep, max_depth) |
| 295 | if flow is not None: |
| 296 | flows.append(flow) |
| 297 | |
| 298 | # Sort by criticality descending. |
| 299 | flows.sort(key=lambda f: f["criticality"], reverse=True) |
| 300 | return flows |
| 301 | |
| 302 | |
| 303 | # --------------------------------------------------------------------------- |