Make a synchronous channel RPC call for a synchronous method frame. If the channel is already in the blocking state, then enqueue the request, but don't send it at this time; it will be eventually sent by `_on_synchronous_complete` after the prior blocking request receives a
(
self,
method: amqp_object.Method,
callback: Optional[Callable[..., Any]] = None,
acceptable_replies: Optional[Sequence[Union[
Type[amqp_object.Method], Tuple[Type[amqp_object.Method],
Dict[str, Any]]]]] = None
)
| 1426 | LOGGER.debug('Ignoring drained blocked method: %s', method) |
| 1427 | |
| 1428 | def _rpc( |
| 1429 | self, |
| 1430 | method: amqp_object.Method, |
| 1431 | callback: Optional[Callable[..., Any]] = None, |
| 1432 | acceptable_replies: Optional[Sequence[Union[ |
| 1433 | Type[amqp_object.Method], Tuple[Type[amqp_object.Method], |
| 1434 | Dict[str, Any]]]]] = None |
| 1435 | ) -> None: |
| 1436 | """Make a synchronous channel RPC call for a synchronous method frame. If |
| 1437 | the channel is already in the blocking state, then enqueue the request, |
| 1438 | but don't send it at this time; it will be eventually sent by |
| 1439 | `_on_synchronous_complete` after the prior blocking request receives a |
| 1440 | response. If the channel is not in the blocking state and |
| 1441 | `acceptable_replies` is not empty, transition the channel to the |
| 1442 | blocking state and register for `_on_synchronous_complete` before |
| 1443 | sending the request. |
| 1444 | |
| 1445 | NOTE: A callback must be accompanied by non-empty acceptable_replies. |
| 1446 | |
| 1447 | :param pika.amqp_object.Method method: The AMQP method to invoke |
| 1448 | :param callable callback: The callback for the RPC response |
| 1449 | :param list|None acceptable_replies: A (possibly empty) sequence of |
| 1450 | replies this RPC call expects or None |
| 1451 | |
| 1452 | """ |
| 1453 | assert method.synchronous, ( |
| 1454 | 'Only synchronous-capable methods may be used with _rpc: %r' % |
| 1455 | (method,)) |
| 1456 | |
| 1457 | # Validate we got None or a list of acceptable_replies |
| 1458 | if not isinstance(acceptable_replies, (type(None), list)): |
| 1459 | raise TypeError('acceptable_replies should be list or None') |
| 1460 | |
| 1461 | if callback is not None: |
| 1462 | # Validate the callback is callable |
| 1463 | if not callable(callback): |
| 1464 | raise TypeError('callback should be None or a callable') |
| 1465 | |
| 1466 | # Make sure that callback is accompanied by acceptable replies |
| 1467 | if not acceptable_replies: |
| 1468 | raise ValueError( |
| 1469 | 'Unexpected callback for asynchronous (nowait) operation.') |
| 1470 | |
| 1471 | # Make sure the channel is not closed yet |
| 1472 | if self.is_closed: |
| 1473 | self._raise_if_not_open() |
| 1474 | |
| 1475 | # If the channel is blocking, add subsequent commands to our stack |
| 1476 | if self._blocking: |
| 1477 | LOGGER.debug( |
| 1478 | 'Already in blocking state, so enqueueing method %s; ' |
| 1479 | 'acceptable_replies=%r', method, acceptable_replies) |
| 1480 | self._blocked.append([method, callback, acceptable_replies]) |
| 1481 | return |
| 1482 | |
| 1483 | # Note: _send_method can throw exceptions if there are framing errors |
| 1484 | # or invalid data passed in. Call it here to prevent self._blocking |
| 1485 | # from being set if an exception is thrown. This also prevents |