Group nodes by directory when Leiden is unavailable or over-fragments. Strips the longest common directory prefix from all file paths, then adaptively picks a grouping depth that yields 10-200 communities.
(
nodes: list[GraphNode],
edges: list[GraphEdge],
min_size: int,
adj: dict[str, list[str]] | None = None,
)
| 348 | |
| 349 | |
| 350 | def _detect_file_based( |
| 351 | nodes: list[GraphNode], |
| 352 | edges: list[GraphEdge], |
| 353 | min_size: int, |
| 354 | adj: dict[str, list[str]] | None = None, |
| 355 | ) -> list[dict[str, Any]]: |
| 356 | """Group nodes by directory when Leiden is unavailable or over-fragments. |
| 357 | |
| 358 | Strips the longest common directory prefix from all file paths, then |
| 359 | adaptively picks a grouping depth that yields 10-200 communities. |
| 360 | """ |
| 361 | # Collect all directory paths (normalized, without filename) |
| 362 | all_dir_parts: list[list[str]] = [] |
| 363 | for n in nodes: |
| 364 | parts = n.file_path.replace("\\", "/").split("/") |
| 365 | all_dir_parts.append([p for p in parts[:-1] if p]) |
| 366 | |
| 367 | # Find the longest common prefix among directory parts |
| 368 | prefix_len = 0 |
| 369 | if all_dir_parts: |
| 370 | shortest = min(len(p) for p in all_dir_parts) |
| 371 | for i in range(shortest): |
| 372 | seg = all_dir_parts[0][i] |
| 373 | if all(p[i] == seg for p in all_dir_parts): |
| 374 | prefix_len = i + 1 |
| 375 | else: |
| 376 | break |
| 377 | |
| 378 | def _group_at_depth(depth: int) -> dict[str, list[GraphNode]]: |
| 379 | groups: dict[str, list[GraphNode]] = defaultdict(list) |
| 380 | for n in nodes: |
| 381 | parts = n.file_path.replace("\\", "/").split("/") |
| 382 | dir_parts = [p for p in parts[:-1] if p] |
| 383 | remainder = dir_parts[prefix_len:] |
| 384 | if remainder: |
| 385 | key = "/".join(remainder[:depth]) |
| 386 | else: |
| 387 | key = parts[-1].rsplit(".", 1)[0] if parts else "root" |
| 388 | groups[key].append(n) |
| 389 | return groups |
| 390 | |
| 391 | # Try increasing depths until we get 10-200 qualifying groups |
| 392 | max_depth = max((len(p) - prefix_len for p in all_dir_parts), default=0) |
| 393 | best_groups = _group_at_depth(1) # depth=1 always works (file stem fallback) |
| 394 | for depth in range(1, max_depth + 1): |
| 395 | groups = _group_at_depth(depth) |
| 396 | qualifying = sum(1 for v in groups.values() if len(v) >= min_size) |
| 397 | best_groups = groups |
| 398 | if qualifying >= 10: |
| 399 | break |
| 400 | |
| 401 | by_dir = best_groups |
| 402 | |
| 403 | # Pre-filter to communities meeting min_size and collect their member |
| 404 | # sets so we can batch-compute all cohesions in a single O(edges) pass. |
| 405 | # Without this, per-community cohesion is O(edges * files), which makes |
| 406 | # community detection effectively hang on large repos. |
| 407 | pending: list[tuple[str, list[GraphNode], set[str]]] = [] |