(partitions_dir, ntypes, gid_ranges, world_size)
| 120 | |
| 121 | |
| 122 | def _prepare_test_data(partitions_dir, ntypes, gid_ranges, world_size): |
| 123 | # read node-id to partition-id mappings from disk |
| 124 | ntype_partids = [] |
| 125 | for ntype_id, ntype in enumerate(ntypes): |
| 126 | filename = f"{ntype}.txt" |
| 127 | assert os.path.isfile(os.path.join(partitions_dir, filename)) |
| 128 | |
| 129 | read_options = pyarrow.csv.ReadOptions( |
| 130 | use_threads=True, |
| 131 | block_size=4096, |
| 132 | autogenerate_column_names=True, |
| 133 | ) |
| 134 | parse_options = pyarrow.csv.ParseOptions(delimiter=" ") |
| 135 | |
| 136 | with pyarrow.csv.open_csv( |
| 137 | os.path.join(partitions_dir, "{}.txt".format(ntype)), |
| 138 | read_options=read_options, |
| 139 | parse_options=parse_options, |
| 140 | ) as reader: |
| 141 | for next_chunk in reader: |
| 142 | if next_chunk is None: |
| 143 | break |
| 144 | next_table = pyarrow.Table.from_batches([next_chunk]) |
| 145 | ntype_partids.append(next_table["f0"].to_numpy()) |
| 146 | |
| 147 | # prepare test data for each rank here |
| 148 | # key = f'rank-{rank}' |
| 149 | # value is a list of tuple [(method-name, request, response)] |
| 150 | test_data = {} |
| 151 | for rank in range(world_size): |
| 152 | ntype_id = np.random.randint(0, len(ntypes) - 1) |
| 153 | ntype = ntypes[ntype_id] |
| 154 | request = ( |
| 155 | np.arange(len(ntype_partids[ntype_id])) |
| 156 | + gid_ranges[ntypes[ntype_id]][0, 0] |
| 157 | ) |
| 158 | response = ntype_partids[ntype_id] |
| 159 | |
| 160 | test_data[f"rank-{rank}"] = [("getpartitionids", request, response)] |
| 161 | |
| 162 | # randomly shuffle the global-nids and retrieve their partition-ids. |
| 163 | for rank in range(world_size): |
| 164 | ntype_id = np.random.randint(0, len(ntypes) - 1) |
| 165 | ntype = ntypes[ntype_id] |
| 166 | idx = np.arange(len(ntype_partids[ntype_id])) |
| 167 | request = idx + gid_ranges[ntypes[ntype_id]][0, 0] |
| 168 | |
| 169 | np.random.shuffle(idx) |
| 170 | request = request[idx] |
| 171 | response = ntype_partids[ntype_id][idx] |
| 172 | |
| 173 | test_data[f"rank-{rank}"] = [("getpartitionids", request, response)] |
| 174 | |
| 175 | # one final test |
| 176 | # mix all the ntypes and shuffle randomly |
| 177 | request = [] |
| 178 | response = [] |
| 179 | for idx in range(len(ntype_partids)): |
no test coverage detected