MCPcopy
hub / github.com/MagicStack/asyncpg / locker

Method locker

tests/test_execute.py:275–288  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

273 event = asyncio.Event()
274
275 async def locker():
276 test_func = getattr(self, self._testMethodName).__func__
277 opts = getattr(test_func, '__connect_options__', {})
278 conn = await self.connect(**opts)
279 try:
280 tx = conn.transaction()
281 await tx.start()
282 await conn.execute("UPDATE exmany SET a = '1' WHERE b = 10")
283 event.set()
284 await asyncio.sleep(1)
285 await tx.rollback()
286 finally:
287 event.set()
288 await conn.close()
289
290 await self.con.executemany('''
291 INSERT INTO exmany VALUES(NULL, $1)

Callers

nothing calls this directly

Calls 6

transactionMethod · 0.80
rollbackMethod · 0.80
connectMethod · 0.45
startMethod · 0.45
executeMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected