MCPcopy Index your code
hub / github.com/tirth8205/code-review-graph / _detect_file_based

Function _detect_file_based

code_review_graph/communities.py:350–433  ·  view source on GitHub ↗

Group nodes by directory when Leiden is unavailable or over-fragments. Strips the longest common directory prefix from all file paths, then adaptively picks a grouping depth that yields 10-200 communities.

(
    nodes: list[GraphNode],
    edges: list[GraphEdge],
    min_size: int,
    adj: dict[str, list[str]] | None = None,
)

Source from the content-addressed store, hash-verified

348
349
350def _detect_file_based(
351 nodes: list[GraphNode],
352 edges: list[GraphEdge],
353 min_size: int,
354 adj: dict[str, list[str]] | None = None,
355) -> list[dict[str, Any]]:
356 """Group nodes by directory when Leiden is unavailable or over-fragments.
357
358 Strips the longest common directory prefix from all file paths, then
359 adaptively picks a grouping depth that yields 10-200 communities.
360 """
361 # Collect all directory paths (normalized, without filename)
362 all_dir_parts: list[list[str]] = []
363 for n in nodes:
364 parts = n.file_path.replace("\\", "/").split("/")
365 all_dir_parts.append([p for p in parts[:-1] if p])
366
367 # Find the longest common prefix among directory parts
368 prefix_len = 0
369 if all_dir_parts:
370 shortest = min(len(p) for p in all_dir_parts)
371 for i in range(shortest):
372 seg = all_dir_parts[0][i]
373 if all(p[i] == seg for p in all_dir_parts):
374 prefix_len = i + 1
375 else:
376 break
377
378 def _group_at_depth(depth: int) -> dict[str, list[GraphNode]]:
379 groups: dict[str, list[GraphNode]] = defaultdict(list)
380 for n in nodes:
381 parts = n.file_path.replace("\\", "/").split("/")
382 dir_parts = [p for p in parts[:-1] if p]
383 remainder = dir_parts[prefix_len:]
384 if remainder:
385 key = "/".join(remainder[:depth])
386 else:
387 key = parts[-1].rsplit(".", 1)[0] if parts else "root"
388 groups[key].append(n)
389 return groups
390
391 # Try increasing depths until we get 10-200 qualifying groups
392 max_depth = max((len(p) - prefix_len for p in all_dir_parts), default=0)
393 best_groups = _group_at_depth(1) # depth=1 always works (file stem fallback)
394 for depth in range(1, max_depth + 1):
395 groups = _group_at_depth(depth)
396 qualifying = sum(1 for v in groups.values() if len(v) >= min_size)
397 best_groups = groups
398 if qualifying >= 10:
399 break
400
401 by_dir = best_groups
402
403 # Pre-filter to communities meeting min_size and collect their member
404 # sets so we can batch-compute all cohesions in a single O(edges) pass.
405 # Without this, per-community cohesion is O(edges * files), which makes
406 # community detection effectively hang on large repos.
407 pending: list[tuple[str, list[GraphNode], set[str]]] = []

Callers 4

_detect_leidenFunction · 0.85
detect_communitiesFunction · 0.85

Calls 3

_group_at_depthFunction · 0.85
_compute_cohesion_batchFunction · 0.85
_generate_community_nameFunction · 0.85