A routine that fetches local neighborhood of nodes from the distributed graph. The local neighborhood of some nodes are stored in the local machine and the other nodes have their neighborhood on remote machines. This code will issue remote access requests first before fetching data from
(
g, nodes, issue_remote_req, local_access, exclude_edges=None
)
| 735 | |
| 736 | |
| 737 | def _distributed_access( |
| 738 | g, nodes, issue_remote_req, local_access, exclude_edges=None |
| 739 | ): |
| 740 | """A routine that fetches local neighborhood of nodes from the distributed graph. |
| 741 | |
| 742 | The local neighborhood of some nodes are stored in the local machine and the other |
| 743 | nodes have their neighborhood on remote machines. This code will issue remote |
| 744 | access requests first before fetching data from the local machine. In the end, |
| 745 | we combine the data from the local machine and remote machines. |
| 746 | In this way, we can hide the latency of accessing data on remote machines. |
| 747 | |
| 748 | Parameters |
| 749 | ---------- |
| 750 | g : DistGraph |
| 751 | The distributed graph |
| 752 | nodes : tensor |
| 753 | The nodes whose neighborhood are to be fetched. |
| 754 | issue_remote_req : callable |
| 755 | The function that issues requests to access remote data. |
| 756 | local_access : callable |
| 757 | The function that reads data on the local machine. |
| 758 | exclude_edges : tensor |
| 759 | The edges to exclude after sampling. |
| 760 | |
| 761 | Returns |
| 762 | ------- |
| 763 | DGLGraph |
| 764 | The subgraph that contains the neighborhoods of all input nodes. |
| 765 | """ |
| 766 | req_list = [] |
| 767 | partition_book = g.get_partition_book() |
| 768 | if not isinstance(nodes, torch.Tensor): |
| 769 | nodes = toindex(nodes).tousertensor() |
| 770 | partition_id = partition_book.nid2partid(nodes) |
| 771 | local_nids = None |
| 772 | for pid in range(partition_book.num_partitions()): |
| 773 | node_id = F.boolean_mask(nodes, partition_id == pid) |
| 774 | # We optimize the sampling on a local partition if the server and the client |
| 775 | # run on the same machine. With a good partitioning, most of the seed nodes |
| 776 | # should reside in the local partition. If the server and the client |
| 777 | # are not co-located, the client doesn't have a local partition. |
| 778 | if pid == partition_book.partid and g.local_partition is not None: |
| 779 | assert local_nids is None |
| 780 | local_nids = node_id |
| 781 | elif len(node_id) != 0: |
| 782 | req = issue_remote_req(node_id) |
| 783 | req_list.append((pid, req)) |
| 784 | |
| 785 | # send requests to the remote machine. |
| 786 | msgseq2pos = None |
| 787 | if len(req_list) > 0: |
| 788 | msgseq2pos = send_requests_to_machine(req_list) |
| 789 | |
| 790 | # sample neighbors for the nodes in the local partition. |
| 791 | res_list = [] |
| 792 | if local_nids is not None: |
| 793 | res = local_access(g.local_partition, partition_book, local_nids) |
| 794 | res_list.append(res) |
no test coverage detected