(self)
| 1664 | self.assertFalse(isinstance(self.con, list)) |
| 1665 | |
| 1666 | async def test_connection_use_after_close(self): |
| 1667 | def check(): |
| 1668 | return self.assertRaisesRegex(asyncpg.InterfaceError, |
| 1669 | 'connection is closed') |
| 1670 | |
| 1671 | await self.con.close() |
| 1672 | |
| 1673 | with check(): |
| 1674 | await self.con.add_listener('aaa', lambda: None) |
| 1675 | |
| 1676 | with check(): |
| 1677 | self.con.transaction() |
| 1678 | |
| 1679 | with check(): |
| 1680 | await self.con.executemany('SELECT 1', []) |
| 1681 | |
| 1682 | with check(): |
| 1683 | await self.con.set_type_codec('aaa', encoder=None, decoder=None) |
| 1684 | |
| 1685 | with check(): |
| 1686 | await self.con.set_builtin_type_codec('aaa', codec_name='aaa') |
| 1687 | |
| 1688 | for meth in ('execute', 'fetch', 'fetchval', 'fetchrow', |
| 1689 | 'prepare', 'cursor'): |
| 1690 | |
| 1691 | with check(): |
| 1692 | await getattr(self.con, meth)('SELECT 1') |
| 1693 | |
| 1694 | with check(): |
| 1695 | await self.con.reset() |
| 1696 | |
| 1697 | @unittest.skipIf(os.environ.get('PGHOST'), 'unmanaged cluster') |
| 1698 | async def test_connection_ssl_to_no_ssl_server(self): |
nothing calls this directly
no test coverage detected