MCPcopy
hub / github.com/pika/pika / basic_cancel

Method basic_cancel

pika/channel.py:253–311  ·  view source on GitHub ↗

This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the

(self,
                     consumer_tag: str = '',
                     callback: Optional[_OnBasicCancelCallback] = None)

Source from the content-addressed store, hash-verified

251 return self._send_method(spec.Basic.Ack(delivery_tag, multiple))
252
253 def basic_cancel(self,
254 consumer_tag: str = '',
255 callback: Optional[_OnBasicCancelCallback] = None) -> None:
256 """This method cancels a consumer. This does not affect already
257 delivered messages, but it does mean the server will not send any more
258 messages for that consumer. The client may receive an arbitrary number
259 of messages in between sending the cancel method and receiving the
260 cancel-ok reply. It may also be sent from the server to the client in
261 the event of the consumer being unexpectedly cancelled (i.e. cancelled
262 for any reason other than the server receiving the corresponding
263 basic.cancel from the client). This allows clients to be notified of
264 the loss of consumers due to events such as queue deletion.
265
266 :param str consumer_tag: Identifier for the consumer
267 :param callable callback: callback(pika.frame.Method) for method
268 Basic.CancelOk. If None, do not expect a Basic.CancelOk response,
269 otherwise, callback must be callable
270
271 :raises ValueError:
272
273 """
274 validators.require_string(consumer_tag, 'consumer_tag')
275 self._raise_if_not_open()
276 nowait = validators.rpc_completion_callback(callback)
277
278 if consumer_tag in self._cancelled:
279 # We check for cancelled first, because basic_cancel removes
280 # consumers closed with nowait from self._consumers
281 LOGGER.warning('basic_cancel - consumer is already cancelling: %s',
282 consumer_tag)
283 return
284
285 if consumer_tag not in self._consumers:
286 # Could be cancelled by user or broker earlier
287 LOGGER.warning('basic_cancel - consumer not found: %s',
288 consumer_tag)
289 return
290
291 LOGGER.debug('Cancelling consumer: %s (nowait=%s)', consumer_tag,
292 nowait)
293
294 if nowait:
295 # This is our last opportunity while the channel is open to remove
296 # this consumer callback and help gc; unfortunately, this consumer's
297 # self._cancelled and self._consumers_with_noack (if any) entries
298 # will persist until the channel is closed.
299 del self._consumers[consumer_tag]
300
301 if callback is not None:
302 self.callbacks.add(self.channel_number, spec.Basic.CancelOk,
303 callback)
304
305 self._cancelled.add(consumer_tag)
306
307 self._rpc(spec.Basic.Cancel(consumer_tag=consumer_tag, nowait=nowait),
308 self._on_cancelok if not nowait else None,
309 [(spec.Basic.CancelOk, {
310 'consumer_tag': consumer_tag

Calls 3

_raise_if_not_openMethod · 0.95
_rpcMethod · 0.95
addMethod · 0.80