Send a certain request to server. Parameters ---------- request_type : str type of proposed request, 'calendar'/'instrument'/'feature'. request_content : dict records the information of the request. msg_proc_func : func the
(self, request_type, request_content, msg_queue, msg_proc_func=None)
| 47 | self.logger.error("Cannot disconnect from server : %s" % e) |
| 48 | |
| 49 | def send_request(self, request_type, request_content, msg_queue, msg_proc_func=None): |
| 50 | """Send a certain request to server. |
| 51 | |
| 52 | Parameters |
| 53 | ---------- |
| 54 | request_type : str |
| 55 | type of proposed request, 'calendar'/'instrument'/'feature'. |
| 56 | request_content : dict |
| 57 | records the information of the request. |
| 58 | msg_proc_func : func |
| 59 | the function to process the message when receiving response, should have arg `*args`. |
| 60 | msg_queue: Queue |
| 61 | The queue to pass the message after callback. |
| 62 | """ |
| 63 | head_info = {"version": qlib.__version__} |
| 64 | |
| 65 | def request_callback(*args): |
| 66 | """callback_wrapper |
| 67 | |
| 68 | :param *args: args[0] is the response content |
| 69 | """ |
| 70 | # args[0] is the response content |
| 71 | self.logger.debug("receive data and enter queue") |
| 72 | msg = dict(args[0]) |
| 73 | if msg["detailed_info"] is not None: |
| 74 | if msg["status"] != 0: |
| 75 | self.logger.error(msg["detailed_info"]) |
| 76 | else: |
| 77 | self.logger.info(msg["detailed_info"]) |
| 78 | if msg["status"] != 0: |
| 79 | ex = ValueError(f"Bad response(status=={msg['status']}), detailed info: {msg['detailed_info']}") |
| 80 | msg_queue.put(ex) |
| 81 | else: |
| 82 | if msg_proc_func is not None: |
| 83 | try: |
| 84 | ret = msg_proc_func(msg["result"]) |
| 85 | except Exception as e: |
| 86 | self.logger.exception("Error when processing message.") |
| 87 | ret = e |
| 88 | else: |
| 89 | ret = msg["result"] |
| 90 | msg_queue.put(ret) |
| 91 | self.disconnect() |
| 92 | self.logger.debug("disconnected") |
| 93 | |
| 94 | self.logger.debug("try connecting") |
| 95 | self.connect_server() |
| 96 | self.logger.debug("connected") |
| 97 | # The pickle is for passing some parameters with special type(such as |
| 98 | # pd.Timestamp) |
| 99 | request_content = {"head": head_info, "body": pickle.dumps(request_content, protocol=C.dump_protocol_version)} |
| 100 | self.sio.on(request_type + "_response", request_callback) |
| 101 | self.logger.debug("try sending") |
| 102 | self.sio.emit(request_type + "_request", request_content) |
| 103 | self.sio.wait() |
no test coverage detected