MCPcopy
hub / github.com/pika/pika / test

Method test

tests/acceptance/blocking_adapter_test.py:988–1050  ·  view source on GitHub ↗

BlockingChannel: Test queue_bind and queue_unbind

(self)

Source from the content-addressed store, hash-verified

986class TestQueueBindAndUnbindAndPurge(BlockingTestCaseBase):
987
988 def test(self):
989 """BlockingChannel: Test queue_bind and queue_unbind"""
990 connection = self._connect()
991
992 ch = connection.channel()
993
994 q_name = 'TestQueueBindAndUnbindAndPurge_q' + uuid.uuid1().hex
995 exg_name = 'TestQueueBindAndUnbindAndPurge_exg_' + uuid.uuid1().hex
996 routing_key = 'TestQueueBindAndUnbindAndPurge'
997
998 # Place channel in publisher-acknowledgments mode so that we may test
999 # whether the queue is reachable by publishing with mandatory=True
1000 res = ch.confirm_delivery()
1001 self.assertIsNone(res)
1002
1003 # Declare a new exchange
1004 ch.exchange_declare(exg_name, exchange_type=ExchangeType.direct)
1005 self.addCleanup(connection.channel().exchange_delete, exg_name)
1006
1007 # Declare a new queue
1008 ch.queue_declare(q_name, exclusive=True)
1009
1010 # Bind the queue to the exchange using routing key
1011 frame = ch.queue_bind(q_name,
1012 exchange=exg_name,
1013 routing_key=routing_key)
1014 self.assertIsInstance(frame.method, pika.spec.Queue.BindOk)
1015
1016 # Check that the queue is empty
1017 frame = ch.queue_declare(q_name, passive=True)
1018 self.assertEqual(frame.method.message_count, 0)
1019
1020 # Deposit a message in the queue
1021 ch.basic_publish(exg_name,
1022 routing_key,
1023 body='TestQueueBindAndUnbindAndPurge',
1024 mandatory=True)
1025
1026 # Check that the queue now has one message
1027 frame = ch.queue_declare(q_name, passive=True)
1028 self.assertEqual(frame.method.message_count, 1)
1029
1030 # Unbind the queue
1031 frame = ch.queue_unbind(queue=q_name,
1032 exchange=exg_name,
1033 routing_key=routing_key)
1034 self.assertIsInstance(frame.method, pika.spec.Queue.UnbindOk)
1035
1036 # Verify that the queue is now unreachable via that binding
1037 with self.assertRaises(pika.exceptions.UnroutableError):
1038 ch.basic_publish(exg_name,
1039 routing_key,
1040 body='TestQueueBindAndUnbindAndPurge-2',
1041 mandatory=True)
1042
1043 # Purge the queue and verify that 1 message was purged
1044 frame = ch.queue_purge(q_name)
1045 self.assertIsInstance(frame.method, pika.spec.Queue.PurgeOk)

Callers

nothing calls this directly

Calls 9

_connectMethod · 0.45
channelMethod · 0.45
confirm_deliveryMethod · 0.45
exchange_declareMethod · 0.45
queue_declareMethod · 0.45
queue_bindMethod · 0.45
basic_publishMethod · 0.45
queue_unbindMethod · 0.45
queue_purgeMethod · 0.45

Tested by

no test coverage detected