MCPcopy
hub / github.com/dmlc/dgl / recv_responses

Function recv_responses

python/dgl/distributed/rpc.py:965–1013  ·  view source on GitHub ↗

Receive responses It returns the responses in the same order as the requests. The order of requests are stored in msgseq2pos. The operation is blocking -- it returns when it receives all responses or it times out. Parameters ---------- msgseq2pos : dict map the

(msgseq2pos, timeout=0)

Source from the content-addressed store, hash-verified

963
964
965def recv_responses(msgseq2pos, timeout=0):
966 """Receive responses
967
968 It returns the responses in the same order as the requests. The order of requests
969 are stored in msgseq2pos.
970
971 The operation is blocking -- it returns when it receives all responses
972 or it times out.
973
974 Parameters
975 ----------
976 msgseq2pos : dict
977 map the message sequence number to its position in the input list.
978 timeout : int, optional
979 The timeout value in milliseconds. If zero, wait indefinitely.
980
981 Returns
982 -------
983 list[Response]
984 Responses for each target-request pair. If the request does not have
985 response, None is placed.
986 """
987 myrank = get_rank()
988 size = np.max(list(msgseq2pos.values())) + 1
989 all_res = [None] * size
990 num_res = len(msgseq2pos)
991 while num_res != 0:
992 # recv response
993 msg = recv_rpc_message(timeout)
994 if msg is None:
995 raise DGLError(
996 f"Timed out for receiving message within {timeout} milliseconds"
997 )
998 num_res -= 1
999 _, res_cls = SERVICE_ID_TO_PROPERTY[msg.service_id]
1000 if res_cls is None:
1001 raise DGLError(
1002 "Got response message from service ID {}, "
1003 "but no response class is registered.".format(msg.service_id)
1004 )
1005 res = deserialize_from_payload(res_cls, msg.data, msg.tensors)
1006 if msg.client_id != myrank:
1007 raise DGLError(
1008 "Got reponse of request sent by client {}, "
1009 "different from my rank {}!".format(msg.client_id, myrank)
1010 )
1011 # set response
1012 all_res[msgseq2pos[msg.msg_seq]] = res
1013 return all_res
1014
1015
1016def remote_call_to_machine(target_and_requests, timeout=0):

Callers 4

remote_call_to_machineFunction · 0.85
_distributed_accessFunction · 0.85
_distributed_edge_accessFunction · 0.85

Calls 6

recv_rpc_messageFunction · 0.85
DGLErrorClass · 0.85
deserialize_from_payloadFunction · 0.85
formatMethod · 0.80
get_rankFunction · 0.70
valuesMethod · 0.45

Tested by

no test coverage detected