An RPC Server that listens for requests and executes them concurrently.
| 17 | |
| 18 | |
| 19 | class RPCServer: |
| 20 | """ |
| 21 | An RPC Server that listens for requests and executes them concurrently. |
| 22 | """ |
| 23 | |
| 24 | def __init__(self, |
| 25 | instance: Any, |
| 26 | hmac_key: Optional[bytes] = None, |
| 27 | num_workers: int = 4, |
| 28 | timeout: float = 0.5, |
| 29 | async_run_task: bool = False) -> None: |
| 30 | """ |
| 31 | Initializes the server with an instance. |
| 32 | |
| 33 | Args: |
| 34 | instance: The instance whose methods will be exposed via RPC. |
| 35 | hmac_key (bytes, optional): HMAC key for encryption. |
| 36 | num_workers (int): Number of worker threads or worker tasks that help parallelize the task execution. |
| 37 | timeout (int): Timeout for RPC calls. |
| 38 | async_run_task (bool): Whether to run the task asynchronously. |
| 39 | |
| 40 | NOTE: make num_workers larger or the remote() and remote_future() may |
| 41 | be blocked by the thread pool. |
| 42 | """ |
| 43 | self._instance = instance |
| 44 | self._hmac_key = hmac_key |
| 45 | self._num_workers = num_workers |
| 46 | self._address = None |
| 47 | self._timeout = timeout |
| 48 | self._client_socket = None |
| 49 | |
| 50 | # Asyncio components |
| 51 | self._loop: Optional[asyncio.AbstractEventLoop] = None |
| 52 | self._main_task: Optional[asyncio.Task] = None |
| 53 | self._worker_tasks: List[asyncio.Task] = [] |
| 54 | self._shutdown_event: Optional[asyncio.Event] = None |
| 55 | self._server_thread: Optional[threading.Thread] = None |
| 56 | |
| 57 | self._stop_event: threading.Event = threading.Event( |
| 58 | ) # for thread-safe shutdown |
| 59 | |
| 60 | self._num_pending_requests = 0 |
| 61 | |
| 62 | self._functions: Dict[str, Callable[..., Any]] = { |
| 63 | # Some built-in methods for RPC server |
| 64 | "_rpc_shutdown": lambda: self.shutdown(is_remote_call=True), |
| 65 | "_rpc_get_attr": lambda name: self.get_attr(name), |
| 66 | } |
| 67 | |
| 68 | if async_run_task: |
| 69 | self._executor = ThreadPoolExecutor( |
| 70 | max_workers=num_workers, thread_name_prefix="rpc_server_worker") |
| 71 | else: |
| 72 | self._executor = None |
| 73 | |
| 74 | self.register_instance(instance) |
| 75 | |
| 76 | logger_debug( |
no outgoing calls