Send requests to the remote machines. This operation isn't block. It returns immediately once it sends all requests. Parameters ---------- target_and_requests : list[(int, Request)] A list of requests and the machine they should be sent to. timeout : int, optional
(target_and_requests)
| 917 | |
| 918 | |
| 919 | def send_requests_to_machine(target_and_requests): |
| 920 | """Send requests to the remote machines. |
| 921 | |
| 922 | This operation isn't block. It returns immediately once it sends all requests. |
| 923 | |
| 924 | Parameters |
| 925 | ---------- |
| 926 | target_and_requests : list[(int, Request)] |
| 927 | A list of requests and the machine they should be sent to. |
| 928 | timeout : int, optional |
| 929 | The timeout value in milliseconds. If zero, wait indefinitely. |
| 930 | |
| 931 | Returns |
| 932 | ------- |
| 933 | msgseq2pos : dict |
| 934 | map the message sequence number to its position in the input list. |
| 935 | """ |
| 936 | msgseq2pos = {} |
| 937 | for pos, (target, request) in enumerate(target_and_requests): |
| 938 | # send request |
| 939 | service_id = request.service_id |
| 940 | msg_seq = incr_msg_seq() |
| 941 | client_id = get_rank() |
| 942 | |
| 943 | server_id = random.randint( |
| 944 | target * get_num_server_per_machine(), |
| 945 | (target + 1) * get_num_server_per_machine() - 1, |
| 946 | ) |
| 947 | data, tensors = serialize_to_payload(request) |
| 948 | msg = RPCMessage( |
| 949 | service_id, |
| 950 | msg_seq, |
| 951 | client_id, |
| 952 | server_id, |
| 953 | data, |
| 954 | tensors, |
| 955 | get_group_id(), |
| 956 | ) |
| 957 | send_rpc_message(msg, server_id) |
| 958 | # check if has response |
| 959 | res_cls = get_service_property(service_id)[1] |
| 960 | if res_cls is not None: |
| 961 | msgseq2pos[msg_seq] = pos |
| 962 | return msgseq2pos |
| 963 | |
| 964 | |
| 965 | def recv_responses(msgseq2pos, timeout=0): |
no test coverage detected