MCPcopy
hub / github.com/pika/pika / test

Method test

tests/acceptance/blocking_adapter_test.py:1169–1208  ·  view source on GitHub ↗

BlockingChannel.basic_reject with requeue=False

(self)

Source from the content-addressed store, hash-verified

1167class TestBasicRejectNoRequeue(BlockingTestCaseBase):
1168
1169 def test(self):
1170 """BlockingChannel.basic_reject with requeue=False"""
1171 connection = self._connect()
1172
1173 ch = connection.channel()
1174
1175 q_name = 'TestBasicRejectNoRequeue_q' + uuid.uuid1().hex
1176
1177 # Place channel in publisher-acknowledgments mode so that the message
1178 # may be delivered synchronously to the queue by publishing it with
1179 # mandatory=True
1180 ch.confirm_delivery()
1181
1182 # Declare a new queue
1183 ch.queue_declare(q_name, exclusive=True)
1184
1185 # Deposit two messages in the queue via default exchange
1186 ch.basic_publish(exchange='',
1187 routing_key=q_name,
1188 body='TestBasicRejectNoRequeue1',
1189 mandatory=True)
1190 ch.basic_publish(exchange='',
1191 routing_key=q_name,
1192 body='TestBasicRejectNoRequeue2',
1193 mandatory=True)
1194
1195 # Get the messages
1196 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1197 self.assertEqual(rx_body, as_bytes('TestBasicRejectNoRequeue1'))
1198
1199 (rx_method, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1200 self.assertEqual(rx_body, as_bytes('TestBasicRejectNoRequeue2'))
1201
1202 # Nack the second message
1203 ch.basic_reject(rx_method.delivery_tag, requeue=False)
1204
1205 # Verify that no messages are present in the queue
1206 self._assert_exact_message_count_with_retries(channel=ch,
1207 queue=q_name,
1208 expected_count=0)
1209
1210
1211class TestBasicNack(BlockingTestCaseBase):

Callers

nothing calls this directly

Calls 9

as_bytesFunction · 0.90
_connectMethod · 0.45
channelMethod · 0.45
confirm_deliveryMethod · 0.45
queue_declareMethod · 0.45
basic_publishMethod · 0.45
basic_getMethod · 0.45
basic_rejectMethod · 0.45

Tested by

no test coverage detected