BlockingChannel.basic_recover with requeue=True. NOTE: the requeue=False option is not supported by RabbitMQ broker as of this writing (using RabbitMQ 3.5.1)
(self)
| 1350 | class TestBasicRecoverWithRequeue(BlockingTestCaseBase): |
| 1351 | |
| 1352 | def test(self): |
| 1353 | """BlockingChannel.basic_recover with requeue=True. |
| 1354 | |
| 1355 | NOTE: the requeue=False option is not supported by RabbitMQ broker as |
| 1356 | of this writing (using RabbitMQ 3.5.1) |
| 1357 | """ |
| 1358 | connection = self._connect() |
| 1359 | |
| 1360 | ch = connection.channel() |
| 1361 | |
| 1362 | q_name = ('TestBasicRecoverWithRequeue_q' + uuid.uuid1().hex) |
| 1363 | |
| 1364 | # Place channel in publisher-acknowledgments mode so that the message |
| 1365 | # may be delivered synchronously to the queue by publishing it with |
| 1366 | # mandatory=True |
| 1367 | ch.confirm_delivery() |
| 1368 | |
| 1369 | # Declare a new queue |
| 1370 | ch.queue_declare(q_name, exclusive=True) |
| 1371 | |
| 1372 | # Deposit two messages in the queue via default exchange |
| 1373 | ch.basic_publish(exchange='', |
| 1374 | routing_key=q_name, |
| 1375 | body='TestBasicRecoverWithRequeue1', |
| 1376 | mandatory=True) |
| 1377 | ch.basic_publish(exchange='', |
| 1378 | routing_key=q_name, |
| 1379 | body='TestBasicRecoverWithRequeue2', |
| 1380 | mandatory=True) |
| 1381 | |
| 1382 | rx_messages = [] |
| 1383 | num_messages = 0 |
| 1384 | for msg in ch.consume(q_name, auto_ack=False): |
| 1385 | num_messages += 1 |
| 1386 | |
| 1387 | if num_messages == 2: |
| 1388 | ch.basic_recover(requeue=True) |
| 1389 | |
| 1390 | if num_messages > 2: |
| 1391 | rx_messages.append(msg) |
| 1392 | |
| 1393 | if num_messages == 4: |
| 1394 | break |
| 1395 | else: |
| 1396 | self.fail('consumer aborted prematurely') |
| 1397 | |
| 1398 | # Get the messages |
| 1399 | (_, _, rx_body) = rx_messages[0] |
| 1400 | self.assertEqual(rx_body, as_bytes('TestBasicRecoverWithRequeue1')) |
| 1401 | |
| 1402 | (_, _, rx_body) = rx_messages[1] |
| 1403 | self.assertEqual(rx_body, as_bytes('TestBasicRecoverWithRequeue2')) |
| 1404 | |
| 1405 | |
| 1406 | class TestTxCommit(BlockingTestCaseBase): |
nothing calls this directly
no test coverage detected