Invoke registered services on remote servers and collect responses. The operation is blocking -- it returns when it receives all responses or it times out. If the target server state is available locally, it invokes local computation to calculate the response. Parameters -
(target_and_requests, timeout=0)
| 837 | |
| 838 | |
| 839 | def remote_call(target_and_requests, timeout=0): |
| 840 | """Invoke registered services on remote servers and collect responses. |
| 841 | |
| 842 | The operation is blocking -- it returns when it receives all responses |
| 843 | or it times out. |
| 844 | |
| 845 | If the target server state is available locally, it invokes local computation |
| 846 | to calculate the response. |
| 847 | |
| 848 | Parameters |
| 849 | ---------- |
| 850 | target_and_requests : list[(int, Request)] |
| 851 | A list of requests and the server they should be sent to. |
| 852 | timeout : int, optional |
| 853 | The timeout value in milliseconds. If zero, wait indefinitely. |
| 854 | |
| 855 | Returns |
| 856 | ------- |
| 857 | list[Response] |
| 858 | Responses for each target-request pair. If the request does not have |
| 859 | response, None is placed. |
| 860 | |
| 861 | Raises |
| 862 | ------ |
| 863 | ConnectionError if there is any problem with the connection. |
| 864 | """ |
| 865 | all_res = [None] * len(target_and_requests) |
| 866 | msgseq2pos = {} |
| 867 | num_res = 0 |
| 868 | myrank = get_rank() |
| 869 | for pos, (target, request) in enumerate(target_and_requests): |
| 870 | # send request |
| 871 | service_id = request.service_id |
| 872 | msg_seq = incr_msg_seq() |
| 873 | client_id = get_rank() |
| 874 | server_id = random.randint( |
| 875 | target * get_num_server_per_machine(), |
| 876 | (target + 1) * get_num_server_per_machine() - 1, |
| 877 | ) |
| 878 | data, tensors = serialize_to_payload(request) |
| 879 | msg = RPCMessage( |
| 880 | service_id, |
| 881 | msg_seq, |
| 882 | client_id, |
| 883 | server_id, |
| 884 | data, |
| 885 | tensors, |
| 886 | get_group_id(), |
| 887 | ) |
| 888 | send_rpc_message(msg, server_id) |
| 889 | # check if has response |
| 890 | res_cls = get_service_property(service_id)[1] |
| 891 | if res_cls is not None: |
| 892 | num_res += 1 |
| 893 | msgseq2pos[msg_seq] = pos |
| 894 | while num_res != 0: |
| 895 | # recv response |
| 896 | msg = recv_rpc_message(timeout) |
nothing calls this directly
no test coverage detected