Wrapper around SQLAlchemy AsyncEngine that provides a databases-compatible API.
| 18 | |
| 19 | |
| 20 | class DatabaseConnection: |
| 21 | """ |
| 22 | Wrapper around SQLAlchemy AsyncEngine that provides a databases-compatible API. |
| 23 | """ |
| 24 | |
| 25 | def __init__(self, url: str, **options: Any) -> None: |
| 26 | """ |
| 27 | Initialize database connection. |
| 28 | |
| 29 | :param url: Database URL with an async driver (e.g., postgresql+asyncpg://) |
| 30 | :param options: Additional engine options |
| 31 | """ |
| 32 | self._force_rollback = options.pop("force_rollback", False) |
| 33 | self._url = url |
| 34 | # Set reasonable pool defaults if not provided |
| 35 | if "pool_size" not in options: |
| 36 | options["pool_size"] = 5 |
| 37 | if "max_overflow" not in options: |
| 38 | options["max_overflow"] = 10 |
| 39 | self._options = options |
| 40 | self._engine: Optional[AsyncEngine] = None |
| 41 | self._autocommit_engine: Optional[AsyncEngine] = None |
| 42 | |
| 43 | self._global_transaction: Optional[Transaction] = None |
| 44 | |
| 45 | async def connect(self) -> None: |
| 46 | """Connect to the database by creating the async engine.""" |
| 47 | if self._engine is None: |
| 48 | self._engine = create_async_engine(self._url, **self._options) |
| 49 | # View of the same engine/pool in AUTOCOMMIT mode. Standalone |
| 50 | # queries use this to avoid a BEGIN/COMMIT round-trip per call, |
| 51 | # matching legacy `databases`-library semantics. Explicit |
| 52 | # Transactions keep using ``self._engine`` so BEGIN / SAVEPOINT |
| 53 | # still work. |
| 54 | self._autocommit_engine = self._engine.execution_options( |
| 55 | isolation_level="AUTOCOMMIT" |
| 56 | ) |
| 57 | |
| 58 | # Set up SQLite foreign keys pragma if using SQLite |
| 59 | if self._engine.dialect.name == "sqlite": # pragma: nocover |
| 60 | |
| 61 | @event.listens_for(self._engine.sync_engine, "connect") |
| 62 | def set_sqlite_pragma(dbapi_conn: Any, connection_record: Any) -> None: |
| 63 | cursor = dbapi_conn.cursor() |
| 64 | cursor.execute("PRAGMA foreign_keys=ON") |
| 65 | cursor.close() |
| 66 | |
| 67 | if self._force_rollback: |
| 68 | assert self._global_transaction is None |
| 69 | self._global_transaction = Transaction( |
| 70 | self, force_rollback=self._force_rollback |
| 71 | ) |
| 72 | await self._global_transaction.__aenter__() |
| 73 | |
| 74 | async def disconnect(self) -> None: |
| 75 | """ |
| 76 | Disconnect from the database and dispose of the engine. |
| 77 | In case of force_rollback, also roll back the global transaction. |
no outgoing calls