MCPcopy
hub / github.com/pika/pika / _rpc

Method _rpc

pika/channel.py:1428–1512  ·  view source on GitHub ↗

Make a synchronous channel RPC call for a synchronous method frame. If the channel is already in the blocking state, then enqueue the request, but don't send it at this time; it will be eventually sent by `_on_synchronous_complete` after the prior blocking request receives a

(
        self,
        method: amqp_object.Method,
        callback: Optional[Callable[..., Any]] = None,
        acceptable_replies: Optional[Sequence[Union[
            Type[amqp_object.Method], Tuple[Type[amqp_object.Method],
                                            Dict[str, Any]]]]] = None
    )

Source from the content-addressed store, hash-verified

1426 LOGGER.debug('Ignoring drained blocked method: %s', method)
1427
1428 def _rpc(
1429 self,
1430 method: amqp_object.Method,
1431 callback: Optional[Callable[..., Any]] = None,
1432 acceptable_replies: Optional[Sequence[Union[
1433 Type[amqp_object.Method], Tuple[Type[amqp_object.Method],
1434 Dict[str, Any]]]]] = None
1435 ) -> None:
1436 """Make a synchronous channel RPC call for a synchronous method frame. If
1437 the channel is already in the blocking state, then enqueue the request,
1438 but don't send it at this time; it will be eventually sent by
1439 `_on_synchronous_complete` after the prior blocking request receives a
1440 response. If the channel is not in the blocking state and
1441 `acceptable_replies` is not empty, transition the channel to the
1442 blocking state and register for `_on_synchronous_complete` before
1443 sending the request.
1444
1445 NOTE: A callback must be accompanied by non-empty acceptable_replies.
1446
1447 :param pika.amqp_object.Method method: The AMQP method to invoke
1448 :param callable callback: The callback for the RPC response
1449 :param list|None acceptable_replies: A (possibly empty) sequence of
1450 replies this RPC call expects or None
1451
1452 """
1453 assert method.synchronous, (
1454 'Only synchronous-capable methods may be used with _rpc: %r' %
1455 (method,))
1456
1457 # Validate we got None or a list of acceptable_replies
1458 if not isinstance(acceptable_replies, (type(None), list)):
1459 raise TypeError('acceptable_replies should be list or None')
1460
1461 if callback is not None:
1462 # Validate the callback is callable
1463 if not callable(callback):
1464 raise TypeError('callback should be None or a callable')
1465
1466 # Make sure that callback is accompanied by acceptable replies
1467 if not acceptable_replies:
1468 raise ValueError(
1469 'Unexpected callback for asynchronous (nowait) operation.')
1470
1471 # Make sure the channel is not closed yet
1472 if self.is_closed:
1473 self._raise_if_not_open()
1474
1475 # If the channel is blocking, add subsequent commands to our stack
1476 if self._blocking:
1477 LOGGER.debug(
1478 'Already in blocking state, so enqueueing method %s; '
1479 'acceptable_replies=%r', method, acceptable_replies)
1480 self._blocked.append([method, callback, acceptable_replies])
1481 return
1482
1483 # Note: _send_method can throw exceptions if there are framing errors
1484 # or invalid data passed in. Call it here to prevent self._blocking
1485 # from being set if an exception is thrown. This also prevents

Callers 15

basic_cancelMethod · 0.95
basic_consumeMethod · 0.95
basic_qosMethod · 0.95
basic_recoverMethod · 0.95
closeMethod · 0.95
confirm_deliveryMethod · 0.95
exchange_bindMethod · 0.95
exchange_declareMethod · 0.95
exchange_deleteMethod · 0.95
exchange_unbindMethod · 0.95
flowMethod · 0.95
openMethod · 0.95

Calls 3

_raise_if_not_openMethod · 0.95
_send_methodMethod · 0.95
addMethod · 0.80