Run Leiden community detection. Returns {community_id: [node_ids]}. Community IDs are stable across runs: 0 = largest community after splitting. Oversized communities (> 25% of graph nodes, min 10) are split by running a second Leiden pass on the subgraph.
(G: nx.Graph)
| 25 | |
| 26 | |
| 27 | def cluster(G: nx.Graph) -> dict[int, list[str]]: |
| 28 | """Run Leiden community detection. Returns {community_id: [node_ids]}. |
| 29 | |
| 30 | Community IDs are stable across runs: 0 = largest community after splitting. |
| 31 | Oversized communities (> 25% of graph nodes, min 10) are split by running |
| 32 | a second Leiden pass on the subgraph. |
| 33 | """ |
| 34 | if G.number_of_nodes() == 0: |
| 35 | return {} |
| 36 | if G.number_of_edges() == 0: |
| 37 | return {i: [n] for i, n in enumerate(sorted(G.nodes))} |
| 38 | |
| 39 | from graspologic.partition import leiden # lazy - avoids 15s numba JIT on import |
| 40 | |
| 41 | # Leiden warns and drops isolates - handle them separately |
| 42 | isolates = [n for n in G.nodes() if G.degree(n) == 0] |
| 43 | connected_nodes = [n for n in G.nodes() if G.degree(n) > 0] |
| 44 | connected = G.subgraph(connected_nodes) |
| 45 | |
| 46 | raw: dict[int, list[str]] = {} |
| 47 | if connected.number_of_nodes() > 0: |
| 48 | partition: dict[str, int] = leiden(connected) |
| 49 | for node, cid in partition.items(): |
| 50 | raw.setdefault(cid, []).append(node) |
| 51 | |
| 52 | # Each isolate becomes its own single-node community |
| 53 | next_cid = max(raw.keys(), default=-1) + 1 |
| 54 | for node in isolates: |
| 55 | raw[next_cid] = [node] |
| 56 | next_cid += 1 |
| 57 | |
| 58 | # Split oversized communities |
| 59 | max_size = max(_MIN_SPLIT_SIZE, int(G.number_of_nodes() * _MAX_COMMUNITY_FRACTION)) |
| 60 | final_communities: list[list[str]] = [] |
| 61 | for nodes in raw.values(): |
| 62 | if len(nodes) > max_size: |
| 63 | final_communities.extend(_split_community(G, nodes)) |
| 64 | else: |
| 65 | final_communities.append(nodes) |
| 66 | |
| 67 | # Re-index by size descending for deterministic ordering |
| 68 | final_communities.sort(key=len, reverse=True) |
| 69 | return {i: sorted(nodes) for i, nodes in enumerate(final_communities)} |
| 70 | |
| 71 | |
| 72 | def _split_community(G: nx.Graph, nodes: list[str]) -> list[list[str]]: |
nothing calls this directly
no test coverage detected