Call a remote async generator method and get streaming results. Implementation note: The outgoing request is sent on the RPCClient’s private event-loop to obey the single-loop rule. The returned items are yielded in the caller’s loop via AsyncQueue, which is thread
(self, name: str, *args,
**kwargs)
| 561 | return self._executor.submit(_async_to_sync) |
| 562 | |
| 563 | async def _call_streaming(self, name: str, *args, |
| 564 | **kwargs) -> AsyncIterator[Any]: |
| 565 | """ |
| 566 | Call a remote async generator method and get streaming results. |
| 567 | |
| 568 | Implementation note: The outgoing request is sent on the RPCClient’s |
| 569 | private event-loop to obey the single-loop rule. The returned items |
| 570 | are yielded in the caller’s loop via AsyncQueue, which is thread-safe. |
| 571 | """ |
| 572 | nvtx_mark_debug(f"RPC.streaming.{name}", color="red", category="RPC") |
| 573 | |
| 574 | if self._server_stopped: |
| 575 | raise RPCCancelled("Server is shutting down, request cancelled") |
| 576 | rpc_params = kwargs.pop("__rpc_params", RPCParams()) |
| 577 | timeout = rpc_params.timeout if rpc_params.timeout is not None else self._timeout |
| 578 | |
| 579 | request_id = uuid.uuid4().hex |
| 580 | # Use AsyncQueue to ensure proper cross-thread communication |
| 581 | queue = AsyncQueue() |
| 582 | # Recreate sync_q with the current running loop for proper cross-thread communication |
| 583 | queue._sync_q = _SyncQueue(queue, asyncio.get_running_loop()) |
| 584 | |
| 585 | # Register queue with lock to ensure thread-safe access |
| 586 | with self._streaming_queues_lock: |
| 587 | self._streaming_queues[request_id] = queue |
| 588 | #logger_debug(f"[{datetime.now().isoformat()}] Registered streaming queue for request_id={request_id}") |
| 589 | |
| 590 | # Build the RPCRequest object here – it's pickle-able and small – but |
| 591 | # *do not* touch the ZeroMQ socket from this (caller) event-loop. |
| 592 | request = RPCRequest(request_id, |
| 593 | method_name=name, |
| 594 | args=args, |
| 595 | kwargs=kwargs, |
| 596 | need_response=True, |
| 597 | timeout=timeout, |
| 598 | is_streaming=True) |
| 599 | |
| 600 | # Send the request on the RPCClient's dedicated loop to guarantee that |
| 601 | # **all** socket I/O happens from exactly one thread / loop. |
| 602 | async def _send_streaming_request(req: RPCRequest): |
| 603 | """Private helper executed in the client loop to put the request.""" |
| 604 | logger_debug( |
| 605 | f"[client] [{datetime.now().isoformat()}] Sending streaming request: {req.method_name}, request_id={req.request_id}" |
| 606 | ) |
| 607 | await self._client_socket.put_async(req) |
| 608 | logger_debug( |
| 609 | f"[client][{datetime.now().isoformat()}] Streaming request sent successfully: {req.method_name}, request_id={req.request_id}" |
| 610 | ) |
| 611 | |
| 612 | send_future = asyncio.run_coroutine_threadsafe( |
| 613 | _send_streaming_request(request), self._loop) |
| 614 | |
| 615 | # Wait until the request is actually on the wire before entering the |
| 616 | # user-visible streaming loop. We wrap the concurrent.futures.Future so |
| 617 | # we can await it in the caller's asyncio context. |
| 618 | await asyncio.wrap_future(send_future) |
| 619 | |
| 620 | try: |
nothing calls this directly
no test coverage detected