(g, n, issue_remote_req, local_access)
| 1306 | |
| 1307 | |
| 1308 | def _distributed_get_node_property(g, n, issue_remote_req, local_access): |
| 1309 | req_list = [] |
| 1310 | partition_book = g.get_partition_book() |
| 1311 | n = toindex(n).tousertensor() |
| 1312 | partition_id = partition_book.nid2partid(n) |
| 1313 | local_nids = None |
| 1314 | reorder_idx = [] |
| 1315 | for pid in range(partition_book.num_partitions()): |
| 1316 | mask = partition_id == pid |
| 1317 | nid = F.boolean_mask(n, mask) |
| 1318 | reorder_idx.append(F.nonzero_1d(mask)) |
| 1319 | if pid == partition_book.partid and g.local_partition is not None: |
| 1320 | assert local_nids is None |
| 1321 | local_nids = nid |
| 1322 | elif len(nid) != 0: |
| 1323 | req = issue_remote_req(nid, pid) |
| 1324 | req_list.append((pid, req)) |
| 1325 | |
| 1326 | # send requests to the remote machine. |
| 1327 | msgseq2pos = None |
| 1328 | if len(req_list) > 0: |
| 1329 | msgseq2pos = send_requests_to_machine(req_list) |
| 1330 | |
| 1331 | # handle edges in local partition. |
| 1332 | vals = None |
| 1333 | if local_nids is not None: |
| 1334 | local_vals = local_access(g.local_partition, partition_book, local_nids) |
| 1335 | shape = list(F.shape(local_vals)) |
| 1336 | shape[0] = len(n) |
| 1337 | vals = F.zeros(shape, F.dtype(local_vals), F.cpu()) |
| 1338 | vals = F.scatter_row( |
| 1339 | vals, reorder_idx[partition_book.partid], local_vals |
| 1340 | ) |
| 1341 | |
| 1342 | # receive responses from remote machines. |
| 1343 | if msgseq2pos is not None: |
| 1344 | results = recv_responses(msgseq2pos) |
| 1345 | if len(results) > 0 and vals is None: |
| 1346 | shape = list(F.shape(results[0].val)) |
| 1347 | shape[0] = len(n) |
| 1348 | vals = F.zeros(shape, F.dtype(results[0].val), F.cpu()) |
| 1349 | for result in results: |
| 1350 | val = result.val |
| 1351 | vals = F.scatter_row(vals, reorder_idx[result.order_id], val) |
| 1352 | return vals |
| 1353 | |
| 1354 | |
| 1355 | def in_degrees(g, v): |
no test coverage detected