Receive responses It returns the responses in the same order as the requests. The order of requests are stored in msgseq2pos. The operation is blocking -- it returns when it receives all responses or it times out. Parameters ---------- msgseq2pos : dict map the
(msgseq2pos, timeout=0)
| 963 | |
| 964 | |
| 965 | def recv_responses(msgseq2pos, timeout=0): |
| 966 | """Receive responses |
| 967 | |
| 968 | It returns the responses in the same order as the requests. The order of requests |
| 969 | are stored in msgseq2pos. |
| 970 | |
| 971 | The operation is blocking -- it returns when it receives all responses |
| 972 | or it times out. |
| 973 | |
| 974 | Parameters |
| 975 | ---------- |
| 976 | msgseq2pos : dict |
| 977 | map the message sequence number to its position in the input list. |
| 978 | timeout : int, optional |
| 979 | The timeout value in milliseconds. If zero, wait indefinitely. |
| 980 | |
| 981 | Returns |
| 982 | ------- |
| 983 | list[Response] |
| 984 | Responses for each target-request pair. If the request does not have |
| 985 | response, None is placed. |
| 986 | """ |
| 987 | myrank = get_rank() |
| 988 | size = np.max(list(msgseq2pos.values())) + 1 |
| 989 | all_res = [None] * size |
| 990 | num_res = len(msgseq2pos) |
| 991 | while num_res != 0: |
| 992 | # recv response |
| 993 | msg = recv_rpc_message(timeout) |
| 994 | if msg is None: |
| 995 | raise DGLError( |
| 996 | f"Timed out for receiving message within {timeout} milliseconds" |
| 997 | ) |
| 998 | num_res -= 1 |
| 999 | _, res_cls = SERVICE_ID_TO_PROPERTY[msg.service_id] |
| 1000 | if res_cls is None: |
| 1001 | raise DGLError( |
| 1002 | "Got response message from service ID {}, " |
| 1003 | "but no response class is registered.".format(msg.service_id) |
| 1004 | ) |
| 1005 | res = deserialize_from_payload(res_cls, msg.data, msg.tensors) |
| 1006 | if msg.client_id != myrank: |
| 1007 | raise DGLError( |
| 1008 | "Got reponse of request sent by client {}, " |
| 1009 | "different from my rank {}!".format(msg.client_id, myrank) |
| 1010 | ) |
| 1011 | # set response |
| 1012 | all_res[msgseq2pos[msg.msg_seq]] = res |
| 1013 | return all_res |
| 1014 | |
| 1015 | |
| 1016 | def remote_call_to_machine(target_and_requests, timeout=0): |
no test coverage detected