(
g, name, ndata_paths, edata_paths, num_chunks, output_path, data_fmt
)
| 28 | |
| 29 | |
| 30 | def _chunk_graph( |
| 31 | g, name, ndata_paths, edata_paths, num_chunks, output_path, data_fmt |
| 32 | ): |
| 33 | # First deal with ndata and edata that are homogeneous (i.e. not a dict-of-dict) |
| 34 | if len(g.ntypes) == 1 and not isinstance( |
| 35 | next(iter(ndata_paths.values())), dict |
| 36 | ): |
| 37 | ndata_paths = {g.ntypes[0]: ndata_paths} |
| 38 | if len(g.etypes) == 1 and not isinstance( |
| 39 | next(iter(edata_paths.values())), dict |
| 40 | ): |
| 41 | edata_paths = {g.etypes[0]: ndata_paths} |
| 42 | # Then convert all edge types to canonical edge types |
| 43 | etypestrs = {etype: ":".join(etype) for etype in g.canonical_etypes} |
| 44 | edata_paths = { |
| 45 | ":".join(g.to_canonical_etype(k)): v for k, v in edata_paths.items() |
| 46 | } |
| 47 | |
| 48 | metadata = {} |
| 49 | |
| 50 | metadata["graph_name"] = name |
| 51 | metadata["node_type"] = g.ntypes |
| 52 | |
| 53 | # Compute the number of nodes per chunk per node type |
| 54 | metadata["num_nodes_per_chunk"] = num_nodes_per_chunk = [] |
| 55 | for ntype in g.ntypes: |
| 56 | num_nodes = g.num_nodes(ntype) |
| 57 | num_nodes_list = [] |
| 58 | for i in range(num_chunks): |
| 59 | n = num_nodes // num_chunks + (i < num_nodes % num_chunks) |
| 60 | num_nodes_list.append(n) |
| 61 | num_nodes_per_chunk.append(num_nodes_list) |
| 62 | num_nodes_per_chunk_dict = { |
| 63 | k: v for k, v in zip(g.ntypes, num_nodes_per_chunk) |
| 64 | } |
| 65 | |
| 66 | metadata["edge_type"] = [etypestrs[etype] for etype in g.canonical_etypes] |
| 67 | |
| 68 | # Compute the number of edges per chunk per edge type |
| 69 | metadata["num_edges_per_chunk"] = num_edges_per_chunk = [] |
| 70 | for etype in g.canonical_etypes: |
| 71 | num_edges = g.num_edges(etype) |
| 72 | num_edges_list = [] |
| 73 | for i in range(num_chunks): |
| 74 | n = num_edges // num_chunks + (i < num_edges % num_chunks) |
| 75 | num_edges_list.append(n) |
| 76 | num_edges_per_chunk.append(num_edges_list) |
| 77 | num_edges_per_chunk_dict = { |
| 78 | k: v for k, v in zip(g.canonical_etypes, num_edges_per_chunk) |
| 79 | } |
| 80 | |
| 81 | # Split edge index |
| 82 | metadata["edges"] = {} |
| 83 | with setdir("edge_index"): |
| 84 | for etype in g.canonical_etypes: |
| 85 | etypestr = etypestrs[etype] |
| 86 | logging.info("Chunking edge index for %s" % etypestr) |
| 87 | edges_meta = {} |
no test coverage detected