MCPcopy Index your code
hub / github.com/google/adk-python / _CommitOrderSpySession

Class _CommitOrderSpySession

tests/unittests/sessions/test_session_service.py:1257–1277  ·  view source on GitHub ↗

SQLAlchemy session spy that marks when commit() has completed.

Source from the content-addressed store, hash-verified

1255
1256
1257class _CommitOrderSpySession:
1258 """SQLAlchemy session spy that marks when commit() has completed."""
1259
1260 def __init__(self, real_session, on_committed):
1261 self._real = real_session
1262 self._on_committed = on_committed
1263
1264 async def __aenter__(self):
1265 self._real = await self._real.__aenter__()
1266 return self
1267
1268 async def __aexit__(self, *args):
1269 return await self._real.__aexit__(*args)
1270
1271 async def commit(self):
1272 result = await self._real.commit()
1273 self._on_committed()
1274 return result
1275
1276 def __getattr__(self, name):
1277 return getattr(self._real, name)
1278
1279
1280@pytest.mark.asyncio

Callers 1

_spy_factoryFunction · 0.85

Calls

no outgoing calls

Tested by 1

_spy_factoryFunction · 0.68