Load all nodes and CALLS/TESTED_BY edges into memory for fast traversal. Reads the entire ``nodes`` and ``edges`` tables in two streaming queries and returns an in-memory adjacency structure suitable for flow tracing and criticality scoring. At ~500k nodes / 3M edges
(self)
| 1244 | return results |
| 1245 | |
| 1246 | def load_flow_adjacency(self) -> "FlowAdjacency": |
| 1247 | """Load all nodes and CALLS/TESTED_BY edges into memory for fast traversal. |
| 1248 | |
| 1249 | Reads the entire ``nodes`` and ``edges`` tables in two streaming |
| 1250 | queries and returns an in-memory adjacency structure suitable for |
| 1251 | flow tracing and criticality scoring. At ~500k nodes / 3M edges |
| 1252 | this fits in a few hundred MB and eliminates tens of millions of |
| 1253 | single-row SQLite point queries that otherwise dominate |
| 1254 | ``trace_flows`` / ``compute_criticality`` runtime. |
| 1255 | """ |
| 1256 | nodes_by_qn: dict[str, GraphNode] = {} |
| 1257 | nodes_by_id: dict[int, GraphNode] = {} |
| 1258 | for row in self._conn.execute("SELECT * FROM nodes"): |
| 1259 | node = self._row_to_node(row) |
| 1260 | nodes_by_qn[node.qualified_name] = node |
| 1261 | nodes_by_id[node.id] = node |
| 1262 | |
| 1263 | calls_out: dict[str, list[str]] = {} |
| 1264 | has_tested_by: set[str] = set() |
| 1265 | for row in self._conn.execute( |
| 1266 | "SELECT kind, source_qualified, target_qualified FROM edges " |
| 1267 | "WHERE kind IN ('CALLS', 'TESTED_BY')" |
| 1268 | ): |
| 1269 | kind, src, tgt = row["kind"], row["source_qualified"], row["target_qualified"] |
| 1270 | if kind == "CALLS": |
| 1271 | calls_out.setdefault(src, []).append(tgt) |
| 1272 | else: # TESTED_BY |
| 1273 | has_tested_by.add(tgt) |
| 1274 | |
| 1275 | return FlowAdjacency( |
| 1276 | calls_out=calls_out, |
| 1277 | has_tested_by=has_tested_by, |
| 1278 | nodes_by_qn=nodes_by_qn, |
| 1279 | nodes_by_id=nodes_by_id, |
| 1280 | ) |
| 1281 | |
| 1282 | # --- Internal helpers --- |
| 1283 |
no test coverage detected