This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the
(self,
consumer_tag: str = '',
callback: Optional[_OnBasicCancelCallback] = None)
| 251 | return self._send_method(spec.Basic.Ack(delivery_tag, multiple)) |
| 252 | |
| 253 | def basic_cancel(self, |
| 254 | consumer_tag: str = '', |
| 255 | callback: Optional[_OnBasicCancelCallback] = None) -> None: |
| 256 | """This method cancels a consumer. This does not affect already |
| 257 | delivered messages, but it does mean the server will not send any more |
| 258 | messages for that consumer. The client may receive an arbitrary number |
| 259 | of messages in between sending the cancel method and receiving the |
| 260 | cancel-ok reply. It may also be sent from the server to the client in |
| 261 | the event of the consumer being unexpectedly cancelled (i.e. cancelled |
| 262 | for any reason other than the server receiving the corresponding |
| 263 | basic.cancel from the client). This allows clients to be notified of |
| 264 | the loss of consumers due to events such as queue deletion. |
| 265 | |
| 266 | :param str consumer_tag: Identifier for the consumer |
| 267 | :param callable callback: callback(pika.frame.Method) for method |
| 268 | Basic.CancelOk. If None, do not expect a Basic.CancelOk response, |
| 269 | otherwise, callback must be callable |
| 270 | |
| 271 | :raises ValueError: |
| 272 | |
| 273 | """ |
| 274 | validators.require_string(consumer_tag, 'consumer_tag') |
| 275 | self._raise_if_not_open() |
| 276 | nowait = validators.rpc_completion_callback(callback) |
| 277 | |
| 278 | if consumer_tag in self._cancelled: |
| 279 | # We check for cancelled first, because basic_cancel removes |
| 280 | # consumers closed with nowait from self._consumers |
| 281 | LOGGER.warning('basic_cancel - consumer is already cancelling: %s', |
| 282 | consumer_tag) |
| 283 | return |
| 284 | |
| 285 | if consumer_tag not in self._consumers: |
| 286 | # Could be cancelled by user or broker earlier |
| 287 | LOGGER.warning('basic_cancel - consumer not found: %s', |
| 288 | consumer_tag) |
| 289 | return |
| 290 | |
| 291 | LOGGER.debug('Cancelling consumer: %s (nowait=%s)', consumer_tag, |
| 292 | nowait) |
| 293 | |
| 294 | if nowait: |
| 295 | # This is our last opportunity while the channel is open to remove |
| 296 | # this consumer callback and help gc; unfortunately, this consumer's |
| 297 | # self._cancelled and self._consumers_with_noack (if any) entries |
| 298 | # will persist until the channel is closed. |
| 299 | del self._consumers[consumer_tag] |
| 300 | |
| 301 | if callback is not None: |
| 302 | self.callbacks.add(self.channel_number, spec.Basic.CancelOk, |
| 303 | callback) |
| 304 | |
| 305 | self._cancelled.add(consumer_tag) |
| 306 | |
| 307 | self._rpc(spec.Basic.Cancel(consumer_tag=consumer_tag, nowait=nowait), |
| 308 | self._on_cancelok if not nowait else None, |
| 309 | [(spec.Basic.CancelOk, { |
| 310 | 'consumer_tag': consumer_tag |