MCPcopy
hub / github.com/dataelement/Clawith / get_db

Function get_db

backend/app/database.py:30–41  ·  view source on GitHub ↗

Dependency for getting async database sessions.

()

Source from the content-addressed store, hash-verified

28
29
30async def get_db() -> AsyncGenerator[AsyncSession, None]:
31 """Dependency for getting async database sessions."""
32 async with async_session() as session:
33 token = _session_ctx.set(session)
34 try:
35 yield session
36 await session.commit()
37 except Exception:
38 await session.rollback()
39 raise
40 finally:
41 _session_ctx.reset(token)
42
43
44_session_ctx: ContextVar[AsyncSession | None] = ContextVar("db_session_ctx", default=None)

Callers

nothing calls this directly

Calls 2

rollbackMethod · 0.80
commitMethod · 0.45

Tested by

no test coverage detected