Function to create edges files to be consumed by ParMETIS for partitioning purposes. This function creates the edge files and each of these will have the following format (meaning each line of these file is of the following format) Here ``global`
(rank, schema_map, params)
| 65 | |
| 66 | |
| 67 | def gen_edge_files(rank, schema_map, params): |
| 68 | """Function to create edges files to be consumed by ParMETIS |
| 69 | for partitioning purposes. |
| 70 | |
| 71 | This function creates the edge files and each of these will have the |
| 72 | following format (meaning each line of these file is of the following format) |
| 73 | <global_src_id> <global_dst_id> |
| 74 | |
| 75 | Here ``global`` prefix means that globally unique identifier assigned each node |
| 76 | in the input graph. In this context globally unique means unique across all the |
| 77 | nodes in the input graph. |
| 78 | |
| 79 | Parameters: |
| 80 | ----------- |
| 81 | rank : int |
| 82 | rank of the current process |
| 83 | schema_map : json dictionary |
| 84 | Dictionary created by reading the metadata.json file for the input dataset. |
| 85 | output : string |
| 86 | Location of storing the node-weights and edge files for ParMETIS. |
| 87 | """ |
| 88 | _, ntype_gnid_offset = get_idranges( |
| 89 | schema_map[constants.STR_NODE_TYPE], |
| 90 | dict( |
| 91 | zip( |
| 92 | schema_map[constants.STR_NODE_TYPE], |
| 93 | schema_map[constants.STR_NUM_NODES_PER_TYPE], |
| 94 | ) |
| 95 | ), |
| 96 | ) |
| 97 | |
| 98 | # Regenerate edge files here. |
| 99 | edge_data = schema_map[constants.STR_EDGES] |
| 100 | |
| 101 | outdir = Path(params.output_dir) |
| 102 | os.makedirs(outdir, exist_ok=True) |
| 103 | |
| 104 | def process_and_write_back(data_df, idx): |
| 105 | data_f0 = data_df[:, 0] |
| 106 | data_f1 = data_df[:, 1] |
| 107 | |
| 108 | global_src_id = data_f0 + ntype_gnid_offset[src_ntype_name][0, 0] |
| 109 | global_dst_id = data_f1 + ntype_gnid_offset[dst_ntype_name][0, 0] |
| 110 | cols = [global_src_id, global_dst_id] |
| 111 | col_names = ["global_src_id", "global_dst_id"] |
| 112 | |
| 113 | out_file_name = Path(edge_data_files[idx]).stem.split(".")[0] |
| 114 | out_file = os.path.join( |
| 115 | outdir, etype_name, f"edges_{out_file_name}.csv" |
| 116 | ) |
| 117 | os.makedirs(os.path.dirname(out_file), exist_ok=True) |
| 118 | |
| 119 | options = csv.WriteOptions(include_header=False, delimiter=" ") |
| 120 | csv.write_csv( |
| 121 | pyarrow.Table.from_arrays(cols, names=col_names), |
| 122 | out_file, |
| 123 | options, |
| 124 | ) |