MCPcopy
hub / github.com/dmlc/dgl / gen_edge_files

Function gen_edge_files

tools/distpartitioning/parmetis_preprocess.py:67–156  ·  view source on GitHub ↗

Function to create edges files to be consumed by ParMETIS for partitioning purposes. This function creates the edge files and each of these will have the following format (meaning each line of these file is of the following format) Here ``global`

(rank, schema_map, params)

Source from the content-addressed store, hash-verified

65
66
67def gen_edge_files(rank, schema_map, params):
68 """Function to create edges files to be consumed by ParMETIS
69 for partitioning purposes.
70
71 This function creates the edge files and each of these will have the
72 following format (meaning each line of these file is of the following format)
73 <global_src_id> <global_dst_id>
74
75 Here ``global`` prefix means that globally unique identifier assigned each node
76 in the input graph. In this context globally unique means unique across all the
77 nodes in the input graph.
78
79 Parameters:
80 -----------
81 rank : int
82 rank of the current process
83 schema_map : json dictionary
84 Dictionary created by reading the metadata.json file for the input dataset.
85 output : string
86 Location of storing the node-weights and edge files for ParMETIS.
87 """
88 _, ntype_gnid_offset = get_idranges(
89 schema_map[constants.STR_NODE_TYPE],
90 dict(
91 zip(
92 schema_map[constants.STR_NODE_TYPE],
93 schema_map[constants.STR_NUM_NODES_PER_TYPE],
94 )
95 ),
96 )
97
98 # Regenerate edge files here.
99 edge_data = schema_map[constants.STR_EDGES]
100
101 outdir = Path(params.output_dir)
102 os.makedirs(outdir, exist_ok=True)
103
104 def process_and_write_back(data_df, idx):
105 data_f0 = data_df[:, 0]
106 data_f1 = data_df[:, 1]
107
108 global_src_id = data_f0 + ntype_gnid_offset[src_ntype_name][0, 0]
109 global_dst_id = data_f1 + ntype_gnid_offset[dst_ntype_name][0, 0]
110 cols = [global_src_id, global_dst_id]
111 col_names = ["global_src_id", "global_dst_id"]
112
113 out_file_name = Path(edge_data_files[idx]).stem.split(".")[0]
114 out_file = os.path.join(
115 outdir, etype_name, f"edges_{out_file_name}.csv"
116 )
117 os.makedirs(os.path.dirname(out_file), exist_ok=True)
118
119 options = csv.WriteOptions(include_header=False, delimiter=" ")
120 csv.write_csv(
121 pyarrow.Table.from_arrays(cols, names=col_names),
122 out_file,
123 options,
124 )

Callers 2

test_gen_edge_filesFunction · 0.90
run_preprocess_dataFunction · 0.85

Calls 7

get_idrangesFunction · 0.90
process_and_write_backFunction · 0.85
appendMethod · 0.80
itemsMethod · 0.45
readMethod · 0.45
joinMethod · 0.45

Tested by 1

test_gen_edge_filesFunction · 0.72