MCPcopy
hub / github.com/dmlc/dgl / _prepare_test_data

Function _prepare_test_data

tests/tools/test_dist_lookup.py:122–195  ·  view source on GitHub ↗
(partitions_dir, ntypes, gid_ranges, world_size)

Source from the content-addressed store, hash-verified

120
121
122def _prepare_test_data(partitions_dir, ntypes, gid_ranges, world_size):
123 # read node-id to partition-id mappings from disk
124 ntype_partids = []
125 for ntype_id, ntype in enumerate(ntypes):
126 filename = f"{ntype}.txt"
127 assert os.path.isfile(os.path.join(partitions_dir, filename))
128
129 read_options = pyarrow.csv.ReadOptions(
130 use_threads=True,
131 block_size=4096,
132 autogenerate_column_names=True,
133 )
134 parse_options = pyarrow.csv.ParseOptions(delimiter=" ")
135
136 with pyarrow.csv.open_csv(
137 os.path.join(partitions_dir, "{}.txt".format(ntype)),
138 read_options=read_options,
139 parse_options=parse_options,
140 ) as reader:
141 for next_chunk in reader:
142 if next_chunk is None:
143 break
144 next_table = pyarrow.Table.from_batches([next_chunk])
145 ntype_partids.append(next_table["f0"].to_numpy())
146
147 # prepare test data for each rank here
148 # key = f'rank-{rank}'
149 # value is a list of tuple [(method-name, request, response)]
150 test_data = {}
151 for rank in range(world_size):
152 ntype_id = np.random.randint(0, len(ntypes) - 1)
153 ntype = ntypes[ntype_id]
154 request = (
155 np.arange(len(ntype_partids[ntype_id]))
156 + gid_ranges[ntypes[ntype_id]][0, 0]
157 )
158 response = ntype_partids[ntype_id]
159
160 test_data[f"rank-{rank}"] = [("getpartitionids", request, response)]
161
162 # randomly shuffle the global-nids and retrieve their partition-ids.
163 for rank in range(world_size):
164 ntype_id = np.random.randint(0, len(ntypes) - 1)
165 ntype = ntypes[ntype_id]
166 idx = np.arange(len(ntype_partids[ntype_id]))
167 request = idx + gid_ranges[ntypes[ntype_id]][0, 0]
168
169 np.random.shuffle(idx)
170 request = request[idx]
171 response = ntype_partids[ntype_id][idx]
172
173 test_data[f"rank-{rank}"] = [("getpartitionids", request, response)]
174
175 # one final test
176 # mix all the ntypes and shuffle randomly
177 request = []
178 response = []
179 for idx in range(len(ntype_partids)):

Callers 1

test_lookup_serviceFunction · 0.85

Calls 4

formatMethod · 0.80
appendMethod · 0.80
joinMethod · 0.45
shuffleMethod · 0.45

Tested by

no test coverage detected