MCPcopy
hub / github.com/ormar-orm/ormar / DatabaseConnection

Class DatabaseConnection

ormar/databases/connection.py:20–180  ·  view source on GitHub ↗

Wrapper around SQLAlchemy AsyncEngine that provides a databases-compatible API.

Source from the content-addressed store, hash-verified

18
19
20class DatabaseConnection:
21 """
22 Wrapper around SQLAlchemy AsyncEngine that provides a databases-compatible API.
23 """
24
25 def __init__(self, url: str, **options: Any) -> None:
26 """
27 Initialize database connection.
28
29 :param url: Database URL with an async driver (e.g., postgresql+asyncpg://)
30 :param options: Additional engine options
31 """
32 self._force_rollback = options.pop("force_rollback", False)
33 self._url = url
34 # Set reasonable pool defaults if not provided
35 if "pool_size" not in options:
36 options["pool_size"] = 5
37 if "max_overflow" not in options:
38 options["max_overflow"] = 10
39 self._options = options
40 self._engine: Optional[AsyncEngine] = None
41 self._autocommit_engine: Optional[AsyncEngine] = None
42
43 self._global_transaction: Optional[Transaction] = None
44
45 async def connect(self) -> None:
46 """Connect to the database by creating the async engine."""
47 if self._engine is None:
48 self._engine = create_async_engine(self._url, **self._options)
49 # View of the same engine/pool in AUTOCOMMIT mode. Standalone
50 # queries use this to avoid a BEGIN/COMMIT round-trip per call,
51 # matching legacy `databases`-library semantics. Explicit
52 # Transactions keep using ``self._engine`` so BEGIN / SAVEPOINT
53 # still work.
54 self._autocommit_engine = self._engine.execution_options(
55 isolation_level="AUTOCOMMIT"
56 )
57
58 # Set up SQLite foreign keys pragma if using SQLite
59 if self._engine.dialect.name == "sqlite": # pragma: nocover
60
61 @event.listens_for(self._engine.sync_engine, "connect")
62 def set_sqlite_pragma(dbapi_conn: Any, connection_record: Any) -> None:
63 cursor = dbapi_conn.cursor()
64 cursor.execute("PRAGMA foreign_keys=ON")
65 cursor.close()
66
67 if self._force_rollback:
68 assert self._global_transaction is None
69 self._global_transaction = Transaction(
70 self, force_rollback=self._force_rollback
71 )
72 await self._global_transaction.__aenter__()
73
74 async def disconnect(self) -> None:
75 """
76 Disconnect from the database and dispose of the engine.
77 In case of force_rollback, also roll back the global transaction.

Callers 15

docs009.pyFile · 0.90
docs008.pyFile · 0.90
docs007.pyFile · 0.90
docs003.pyFile · 0.90
docs006.pyFile · 0.90
docs002.pyFile · 0.90
docs004.pyFile · 0.90
docs001.pyFile · 0.90
docs005.pyFile · 0.90
docs001.pyFile · 0.90
docs001.pyFile · 0.90
docs002.pyFile · 0.90

Calls

no outgoing calls