MCPcopy
hub / github.com/pika/pika / test

Method test

tests/acceptance/blocking_adapter_test.py:1408–1441  ·  view source on GitHub ↗

BlockingChannel.tx_commit

(self)

Source from the content-addressed store, hash-verified

1406class TestTxCommit(BlockingTestCaseBase):
1407
1408 def test(self):
1409 """BlockingChannel.tx_commit"""
1410 connection = self._connect()
1411
1412 ch = connection.channel()
1413
1414 q_name = 'TestTxCommit_q' + uuid.uuid1().hex
1415
1416 # Declare a new queue
1417 ch.queue_declare(q_name, exclusive=True)
1418
1419 # Select standard transaction mode
1420 frame = ch.tx_select()
1421 self.assertIsInstance(frame.method, pika.spec.Tx.SelectOk)
1422
1423 # Deposit a message in the queue via default exchange
1424 ch.basic_publish(exchange='',
1425 routing_key=q_name,
1426 body='TestTxCommit1',
1427 mandatory=True)
1428
1429 # Verify that queue is still empty
1430 frame = ch.queue_declare(q_name, passive=True)
1431 self.assertEqual(frame.method.message_count, 0)
1432
1433 # Commit the transaction
1434 ch.tx_commit()
1435
1436 # Verify that the queue has the expected message
1437 frame = ch.queue_declare(q_name, passive=True)
1438 self.assertEqual(frame.method.message_count, 1)
1439
1440 (_, _, rx_body) = ch.basic_get(q_name, auto_ack=False)
1441 self.assertEqual(rx_body, as_bytes('TestTxCommit1'))
1442
1443
1444class TestTxRollback(BlockingTestCaseBase):

Callers

nothing calls this directly

Calls 8

as_bytesFunction · 0.90
_connectMethod · 0.45
channelMethod · 0.45
queue_declareMethod · 0.45
tx_selectMethod · 0.45
basic_publishMethod · 0.45
tx_commitMethod · 0.45
basic_getMethod · 0.45

Tested by

no test coverage detected