Process a request. Returns None for streaming requests (handled separately).
(self, req: RPCRequest)
| 445 | return adjusted_timeout |
| 446 | |
| 447 | async def _process_request(self, req: RPCRequest) -> Optional[RPCResponse]: |
| 448 | """Process a request. Returns None for streaming requests (handled separately).""" |
| 449 | func = self._functions[req.method_name] |
| 450 | |
| 451 | # Calculate adjusted timeout based on pending overhead |
| 452 | adjusted_timeout = self._calculate_adjusted_timeout(req) |
| 453 | |
| 454 | try: |
| 455 | if inspect.iscoroutinefunction(func): |
| 456 | # Execute async function directly in event loop, no need to run in executor due to the GIL |
| 457 | logger_debug( |
| 458 | f"[server] RPC Server running async task {req.method_name} in dispatcher" |
| 459 | ) |
| 460 | result = await asyncio.wait_for(func(*req.args, **req.kwargs), |
| 461 | timeout=adjusted_timeout) |
| 462 | else: |
| 463 | # Execute sync function in thread executor |
| 464 | loop = asyncio.get_running_loop() |
| 465 | |
| 466 | def call_with_kwargs(): |
| 467 | return func(*req.args, **req.kwargs) |
| 468 | |
| 469 | logger_debug( |
| 470 | f"[server] RPC Server running async task {req.method_name} in worker" |
| 471 | ) |
| 472 | # TODO: let num worker control the pool size |
| 473 | result = await asyncio.wait_for(loop.run_in_executor( |
| 474 | self._executor, call_with_kwargs), |
| 475 | timeout=adjusted_timeout) |
| 476 | |
| 477 | response = RPCResponse(req.request_id, result=result) |
| 478 | |
| 479 | except asyncio.TimeoutError: |
| 480 | response = RPCResponse( |
| 481 | req.request_id, |
| 482 | result=None, |
| 483 | error=RPCTimeout( |
| 484 | f"Method '{req.method_name}' timed out after {req.timeout} seconds", |
| 485 | traceback=traceback.format_exc())) |
| 486 | |
| 487 | except Exception as e: |
| 488 | response = RPCResponse(req.request_id, |
| 489 | result=None, |
| 490 | error=RPCError( |
| 491 | str(e), |
| 492 | cause=e, |
| 493 | traceback=traceback.format_exc())) |
| 494 | |
| 495 | return response |
| 496 | |
| 497 | async def _process_streaming_request(self, req: RPCRequest) -> None: |
| 498 | """Process a streaming request by sending multiple responses.""" |
no test coverage detected