(self)
| 2243 | class TestConnectionGC(tb.ClusterTestCase): |
| 2244 | |
| 2245 | async def _run_no_explicit_close_test(self): |
| 2246 | gc_was_enabled = gc.isenabled() |
| 2247 | gc.disable() |
| 2248 | try: |
| 2249 | con = await self.connect() |
| 2250 | await con.fetchval("select 123") |
| 2251 | proto = con._protocol |
| 2252 | conref = weakref.ref(con) |
| 2253 | del con |
| 2254 | |
| 2255 | self.assertIsNone(conref()) |
| 2256 | self.assertTrue(proto.is_closed()) |
| 2257 | |
| 2258 | # tick event loop; asyncio.selector_events._SelectorSocketTransport |
| 2259 | # needs a chance to close itself and remove its reference to proto |
| 2260 | await asyncio.sleep(0) |
| 2261 | protoref = weakref.ref(proto) |
| 2262 | del proto |
| 2263 | self.assertIsNone(protoref()) |
| 2264 | finally: |
| 2265 | if gc_was_enabled: |
| 2266 | gc.enable() |
| 2267 | |
| 2268 | async def test_no_explicit_close_no_debug(self): |
| 2269 | olddebug = self.loop.get_debug() |
no test coverage detected