MCPcopy Index your code
hub / github.com/dmlc/dgl / remote_call

Function remote_call

python/dgl/distributed/rpc.py:839–916  ·  view source on GitHub ↗

Invoke registered services on remote servers and collect responses. The operation is blocking -- it returns when it receives all responses or it times out. If the target server state is available locally, it invokes local computation to calculate the response. Parameters -

(target_and_requests, timeout=0)

Source from the content-addressed store, hash-verified

837
838
839def remote_call(target_and_requests, timeout=0):
840 """Invoke registered services on remote servers and collect responses.
841
842 The operation is blocking -- it returns when it receives all responses
843 or it times out.
844
845 If the target server state is available locally, it invokes local computation
846 to calculate the response.
847
848 Parameters
849 ----------
850 target_and_requests : list[(int, Request)]
851 A list of requests and the server they should be sent to.
852 timeout : int, optional
853 The timeout value in milliseconds. If zero, wait indefinitely.
854
855 Returns
856 -------
857 list[Response]
858 Responses for each target-request pair. If the request does not have
859 response, None is placed.
860
861 Raises
862 ------
863 ConnectionError if there is any problem with the connection.
864 """
865 all_res = [None] * len(target_and_requests)
866 msgseq2pos = {}
867 num_res = 0
868 myrank = get_rank()
869 for pos, (target, request) in enumerate(target_and_requests):
870 # send request
871 service_id = request.service_id
872 msg_seq = incr_msg_seq()
873 client_id = get_rank()
874 server_id = random.randint(
875 target * get_num_server_per_machine(),
876 (target + 1) * get_num_server_per_machine() - 1,
877 )
878 data, tensors = serialize_to_payload(request)
879 msg = RPCMessage(
880 service_id,
881 msg_seq,
882 client_id,
883 server_id,
884 data,
885 tensors,
886 get_group_id(),
887 )
888 send_rpc_message(msg, server_id)
889 # check if has response
890 res_cls = get_service_property(service_id)[1]
891 if res_cls is not None:
892 num_res += 1
893 msgseq2pos[msg_seq] = pos
894 while num_res != 0:
895 # recv response
896 msg = recv_rpc_message(timeout)

Callers

nothing calls this directly

Calls 12

incr_msg_seqFunction · 0.85
serialize_to_payloadFunction · 0.85
RPCMessageClass · 0.85
get_group_idFunction · 0.85
send_rpc_messageFunction · 0.85
get_service_propertyFunction · 0.85
recv_rpc_messageFunction · 0.85
DGLErrorClass · 0.85
deserialize_from_payloadFunction · 0.85
formatMethod · 0.80
get_rankFunction · 0.70

Tested by

no test coverage detected