MCPcopy
hub / github.com/MagicStack/asyncpg / test_nested_isolation_level

Method test_nested_isolation_level

tests/test_transaction.py:214–250  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

212 await self.con.reset()
213
214 async def test_nested_isolation_level(self):
215 set_sql = 'SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL '
216 isolation_levels = {
217 'read_committed': 'read committed',
218 'read_uncommitted': 'read uncommitted',
219 'repeatable_read': 'repeatable read',
220 'serializable': 'serializable',
221 }
222 for inner in [None] + list(isolation_levels):
223 for outer, outer_sql_level in isolation_levels.items():
224 for implicit in [False, True]:
225 with self.subTest(
226 implicit=implicit, outer=outer, inner=inner,
227 ):
228 if implicit:
229 await self.con.execute(set_sql + outer_sql_level)
230 outer_level = None
231 else:
232 outer_level = outer
233
234 async with self.con.transaction(isolation=outer_level):
235 if inner and outer != inner:
236 with self.assertRaisesRegex(
237 asyncpg.InterfaceError,
238 'current {!r} != outer {!r}'.format(
239 inner, outer
240 )
241 ):
242 async with self.con.transaction(
243 isolation=inner,
244 ):
245 pass
246 else:
247 async with self.con.transaction(
248 isolation=inner,
249 ):
250 pass

Callers

nothing calls this directly

Calls 2

transactionMethod · 0.80
executeMethod · 0.45

Tested by

no test coverage detected