Receive one request. Receive one :class:`RPCMessage` and de-serialize it into a proper Request object. The operation is blocking -- it returns when it receives any message or it times out. Parameters ---------- timeout : int, optional The timeout value in milliseco
(timeout=0)
| 747 | |
| 748 | |
| 749 | def recv_request(timeout=0): |
| 750 | """Receive one request. |
| 751 | |
| 752 | Receive one :class:`RPCMessage` and de-serialize it into a proper Request object. |
| 753 | |
| 754 | The operation is blocking -- it returns when it receives any message |
| 755 | or it times out. |
| 756 | |
| 757 | Parameters |
| 758 | ---------- |
| 759 | timeout : int, optional |
| 760 | The timeout value in milliseconds. If zero, wait indefinitely. |
| 761 | |
| 762 | Returns |
| 763 | ------- |
| 764 | req : request |
| 765 | One request received from the target, or None if it times out. |
| 766 | client_id : int |
| 767 | Client' ID received from the target, or -1 if it times out. |
| 768 | group_id : int |
| 769 | Group' ID received from the target, or -1 if it times out. |
| 770 | |
| 771 | Raises |
| 772 | ------ |
| 773 | ConnectionError if there is any problem with the connection. |
| 774 | """ |
| 775 | msg = recv_rpc_message(timeout) |
| 776 | if msg is None: |
| 777 | return None, -1, -1 |
| 778 | set_msg_seq(msg.msg_seq) |
| 779 | req_cls, _ = SERVICE_ID_TO_PROPERTY[msg.service_id] |
| 780 | if req_cls is None: |
| 781 | raise DGLError( |
| 782 | "Got request message from service ID {}, " |
| 783 | "but no request class is registered.".format(msg.service_id) |
| 784 | ) |
| 785 | req = deserialize_from_payload(req_cls, msg.data, msg.tensors) |
| 786 | if msg.server_id != get_rank(): |
| 787 | raise DGLError( |
| 788 | "Got request sent to server {}, " |
| 789 | "different from my rank {}!".format(msg.server_id, get_rank()) |
| 790 | ) |
| 791 | return req, msg.client_id, msg.group_id |
| 792 | |
| 793 | |
| 794 | def recv_response(timeout=0): |
nothing calls this directly
no test coverage detected