Connect to the database by creating the async engine.
(self)
| 43 | self._global_transaction: Optional[Transaction] = None |
| 44 | |
| 45 | async def connect(self) -> None: |
| 46 | """Connect to the database by creating the async engine.""" |
| 47 | if self._engine is None: |
| 48 | self._engine = create_async_engine(self._url, **self._options) |
| 49 | # View of the same engine/pool in AUTOCOMMIT mode. Standalone |
| 50 | # queries use this to avoid a BEGIN/COMMIT round-trip per call, |
| 51 | # matching legacy `databases`-library semantics. Explicit |
| 52 | # Transactions keep using ``self._engine`` so BEGIN / SAVEPOINT |
| 53 | # still work. |
| 54 | self._autocommit_engine = self._engine.execution_options( |
| 55 | isolation_level="AUTOCOMMIT" |
| 56 | ) |
| 57 | |
| 58 | # Set up SQLite foreign keys pragma if using SQLite |
| 59 | if self._engine.dialect.name == "sqlite": # pragma: nocover |
| 60 | |
| 61 | @event.listens_for(self._engine.sync_engine, "connect") |
| 62 | def set_sqlite_pragma(dbapi_conn: Any, connection_record: Any) -> None: |
| 63 | cursor = dbapi_conn.cursor() |
| 64 | cursor.execute("PRAGMA foreign_keys=ON") |
| 65 | cursor.close() |
| 66 | |
| 67 | if self._force_rollback: |
| 68 | assert self._global_transaction is None |
| 69 | self._global_transaction = Transaction( |
| 70 | self, force_rollback=self._force_rollback |
| 71 | ) |
| 72 | await self._global_transaction.__aenter__() |
| 73 | |
| 74 | async def disconnect(self) -> None: |
| 75 | """ |
no test coverage detected