MCPcopy
hub / github.com/pika/pika / test

Method test

tests/acceptance/blocking_adapter_test.py:1058–1117  ·  view source on GitHub ↗

BlockingChannel.basic_get

(self)

Source from the content-addressed store, hash-verified

1056 LOGGER.info('%s TEARING DOWN (%s)', datetime.now(timezone.utc), self)
1057
1058 def test(self):
1059 """BlockingChannel.basic_get"""
1060 LOGGER.info('%s STARTED (%s)', datetime.now(timezone.utc), self)
1061
1062 connection = self._connect()
1063 LOGGER.info('%s CONNECTED (%s)', datetime.now(timezone.utc), self)
1064
1065 ch = connection.channel()
1066 LOGGER.info('%s CREATED CHANNEL (%s)', datetime.now(timezone.utc), self)
1067
1068 q_name = 'TestBasicGet_q' + uuid.uuid1().hex
1069
1070 # Place channel in publisher-acknowledgments mode so that the message
1071 # may be delivered synchronously to the queue by publishing it with
1072 # mandatory=True
1073 ch.confirm_delivery()
1074 LOGGER.info('%s ENABLED PUB-ACKS (%s)', datetime.now(timezone.utc),
1075 self)
1076
1077 # Declare a new queue
1078 ch.queue_declare(q_name, exclusive=True)
1079 LOGGER.info('%s DECLARED QUEUE (%s)', datetime.now(timezone.utc), self)
1080
1081 # Verify result of getting a message from an empty queue
1082 msg = ch.basic_get(q_name, auto_ack=False)
1083 self.assertTupleEqual(msg, (None, None, None))
1084 LOGGER.info('%s GOT FROM EMPTY QUEUE (%s)', datetime.now(timezone.utc),
1085 self)
1086
1087 body = 'TestBasicGet'
1088 # Deposit a message in the queue via default exchange
1089 ch.basic_publish(exchange='',
1090 routing_key=q_name,
1091 body=body,
1092 mandatory=True)
1093 LOGGER.info('%s PUBLISHED (%s)', datetime.now(timezone.utc), self)
1094
1095 # Get the message
1096 (method, properties, body) = ch.basic_get(q_name, auto_ack=False)
1097 LOGGER.info('%s GOT FROM NON-EMPTY QUEUE (%s)',
1098 datetime.now(timezone.utc), self)
1099 self.assertIsInstance(method, pika.spec.Basic.GetOk)
1100 self.assertEqual(method.delivery_tag, 1)
1101 self.assertFalse(method.redelivered)
1102 self.assertEqual(method.exchange, '')
1103 self.assertEqual(method.routing_key, q_name)
1104 self.assertEqual(method.message_count, 0)
1105
1106 self.assertIsInstance(properties, pika.BasicProperties)
1107 self.assertIsNone(properties.headers)
1108 self.assertEqual(body, as_bytes(body))
1109
1110 # Ack it
1111 ch.basic_ack(delivery_tag=method.delivery_tag)
1112 LOGGER.info('%s ACKED (%s)', datetime.now(timezone.utc), self)
1113
1114 # Verify that the queue is now empty
1115 self._assert_exact_message_count_with_retries(channel=ch,

Callers

nothing calls this directly

Calls 9

as_bytesFunction · 0.90
_connectMethod · 0.45
channelMethod · 0.45
confirm_deliveryMethod · 0.45
queue_declareMethod · 0.45
basic_getMethod · 0.45
basic_publishMethod · 0.45
basic_ackMethod · 0.45

Tested by

no test coverage detected