Test that Connection.reset() closes any open transaction.
(self)
| 516 | self.assertEqual(len(cons), N) |
| 517 | |
| 518 | async def test_pool_release_in_xact(self): |
| 519 | """Test that Connection.reset() closes any open transaction.""" |
| 520 | async with self.create_pool(database='postgres', |
| 521 | min_size=1, max_size=1) as pool: |
| 522 | async def get_xact_id(con): |
| 523 | return await con.fetchval('select txid_current()') |
| 524 | |
| 525 | with self.assertLoopErrorHandlerCalled('an active transaction'): |
| 526 | async with pool.acquire() as con: |
| 527 | real_con = con._con # unwrap PoolConnectionProxy |
| 528 | |
| 529 | id1 = await get_xact_id(con) |
| 530 | |
| 531 | tr = con.transaction() |
| 532 | self.assertIsNone(con._con._top_xact) |
| 533 | await tr.start() |
| 534 | self.assertIs(real_con._top_xact, tr) |
| 535 | |
| 536 | id2 = await get_xact_id(con) |
| 537 | self.assertNotEqual(id1, id2) |
| 538 | |
| 539 | self.assertIsNone(real_con._top_xact) |
| 540 | |
| 541 | async with pool.acquire() as con: |
| 542 | self.assertIs(con._con, real_con) |
| 543 | self.assertIsNone(con._con._top_xact) |
| 544 | id3 = await get_xact_id(con) |
| 545 | self.assertNotEqual(id2, id3) |
| 546 | |
| 547 | async def test_pool_connection_methods(self): |
| 548 | async def test_fetch(pool): |
nothing calls this directly
no test coverage detected