BlockingChannel: Test queue_bind and queue_unbind
(self)
| 986 | class TestQueueBindAndUnbindAndPurge(BlockingTestCaseBase): |
| 987 | |
| 988 | def test(self): |
| 989 | """BlockingChannel: Test queue_bind and queue_unbind""" |
| 990 | connection = self._connect() |
| 991 | |
| 992 | ch = connection.channel() |
| 993 | |
| 994 | q_name = 'TestQueueBindAndUnbindAndPurge_q' + uuid.uuid1().hex |
| 995 | exg_name = 'TestQueueBindAndUnbindAndPurge_exg_' + uuid.uuid1().hex |
| 996 | routing_key = 'TestQueueBindAndUnbindAndPurge' |
| 997 | |
| 998 | # Place channel in publisher-acknowledgments mode so that we may test |
| 999 | # whether the queue is reachable by publishing with mandatory=True |
| 1000 | res = ch.confirm_delivery() |
| 1001 | self.assertIsNone(res) |
| 1002 | |
| 1003 | # Declare a new exchange |
| 1004 | ch.exchange_declare(exg_name, exchange_type=ExchangeType.direct) |
| 1005 | self.addCleanup(connection.channel().exchange_delete, exg_name) |
| 1006 | |
| 1007 | # Declare a new queue |
| 1008 | ch.queue_declare(q_name, exclusive=True) |
| 1009 | |
| 1010 | # Bind the queue to the exchange using routing key |
| 1011 | frame = ch.queue_bind(q_name, |
| 1012 | exchange=exg_name, |
| 1013 | routing_key=routing_key) |
| 1014 | self.assertIsInstance(frame.method, pika.spec.Queue.BindOk) |
| 1015 | |
| 1016 | # Check that the queue is empty |
| 1017 | frame = ch.queue_declare(q_name, passive=True) |
| 1018 | self.assertEqual(frame.method.message_count, 0) |
| 1019 | |
| 1020 | # Deposit a message in the queue |
| 1021 | ch.basic_publish(exg_name, |
| 1022 | routing_key, |
| 1023 | body='TestQueueBindAndUnbindAndPurge', |
| 1024 | mandatory=True) |
| 1025 | |
| 1026 | # Check that the queue now has one message |
| 1027 | frame = ch.queue_declare(q_name, passive=True) |
| 1028 | self.assertEqual(frame.method.message_count, 1) |
| 1029 | |
| 1030 | # Unbind the queue |
| 1031 | frame = ch.queue_unbind(queue=q_name, |
| 1032 | exchange=exg_name, |
| 1033 | routing_key=routing_key) |
| 1034 | self.assertIsInstance(frame.method, pika.spec.Queue.UnbindOk) |
| 1035 | |
| 1036 | # Verify that the queue is now unreachable via that binding |
| 1037 | with self.assertRaises(pika.exceptions.UnroutableError): |
| 1038 | ch.basic_publish(exg_name, |
| 1039 | routing_key, |
| 1040 | body='TestQueueBindAndUnbindAndPurge-2', |
| 1041 | mandatory=True) |
| 1042 | |
| 1043 | # Purge the queue and verify that 1 message was purged |
| 1044 | frame = ch.queue_purge(q_name) |
| 1045 | self.assertIsInstance(frame.method, pika.spec.Queue.PurgeOk) |
nothing calls this directly
no test coverage detected