BlockingChannel.basic_get
(self)
| 1056 | LOGGER.info('%s TEARING DOWN (%s)', datetime.now(timezone.utc), self) |
| 1057 | |
| 1058 | def test(self): |
| 1059 | """BlockingChannel.basic_get""" |
| 1060 | LOGGER.info('%s STARTED (%s)', datetime.now(timezone.utc), self) |
| 1061 | |
| 1062 | connection = self._connect() |
| 1063 | LOGGER.info('%s CONNECTED (%s)', datetime.now(timezone.utc), self) |
| 1064 | |
| 1065 | ch = connection.channel() |
| 1066 | LOGGER.info('%s CREATED CHANNEL (%s)', datetime.now(timezone.utc), self) |
| 1067 | |
| 1068 | q_name = 'TestBasicGet_q' + uuid.uuid1().hex |
| 1069 | |
| 1070 | # Place channel in publisher-acknowledgments mode so that the message |
| 1071 | # may be delivered synchronously to the queue by publishing it with |
| 1072 | # mandatory=True |
| 1073 | ch.confirm_delivery() |
| 1074 | LOGGER.info('%s ENABLED PUB-ACKS (%s)', datetime.now(timezone.utc), |
| 1075 | self) |
| 1076 | |
| 1077 | # Declare a new queue |
| 1078 | ch.queue_declare(q_name, exclusive=True) |
| 1079 | LOGGER.info('%s DECLARED QUEUE (%s)', datetime.now(timezone.utc), self) |
| 1080 | |
| 1081 | # Verify result of getting a message from an empty queue |
| 1082 | msg = ch.basic_get(q_name, auto_ack=False) |
| 1083 | self.assertTupleEqual(msg, (None, None, None)) |
| 1084 | LOGGER.info('%s GOT FROM EMPTY QUEUE (%s)', datetime.now(timezone.utc), |
| 1085 | self) |
| 1086 | |
| 1087 | body = 'TestBasicGet' |
| 1088 | # Deposit a message in the queue via default exchange |
| 1089 | ch.basic_publish(exchange='', |
| 1090 | routing_key=q_name, |
| 1091 | body=body, |
| 1092 | mandatory=True) |
| 1093 | LOGGER.info('%s PUBLISHED (%s)', datetime.now(timezone.utc), self) |
| 1094 | |
| 1095 | # Get the message |
| 1096 | (method, properties, body) = ch.basic_get(q_name, auto_ack=False) |
| 1097 | LOGGER.info('%s GOT FROM NON-EMPTY QUEUE (%s)', |
| 1098 | datetime.now(timezone.utc), self) |
| 1099 | self.assertIsInstance(method, pika.spec.Basic.GetOk) |
| 1100 | self.assertEqual(method.delivery_tag, 1) |
| 1101 | self.assertFalse(method.redelivered) |
| 1102 | self.assertEqual(method.exchange, '') |
| 1103 | self.assertEqual(method.routing_key, q_name) |
| 1104 | self.assertEqual(method.message_count, 0) |
| 1105 | |
| 1106 | self.assertIsInstance(properties, pika.BasicProperties) |
| 1107 | self.assertIsNone(properties.headers) |
| 1108 | self.assertEqual(body, as_bytes(body)) |
| 1109 | |
| 1110 | # Ack it |
| 1111 | ch.basic_ack(delivery_tag=method.delivery_tag) |
| 1112 | LOGGER.info('%s ACKED (%s)', datetime.now(timezone.utc), self) |
| 1113 | |
| 1114 | # Verify that the queue is now empty |
| 1115 | self._assert_exact_message_count_with_retries(channel=ch, |
nothing calls this directly
no test coverage detected