Get connection, reusing transaction connection if in transaction. :param transactional: If True, open an explicit transaction for the scope of the executor. Use for server-side cursors (``iterate``) and multi-row ``execute_many`` batches, which need a real
(
self, *, transactional: bool = False
)
| 130 | |
| 131 | @asynccontextmanager |
| 132 | async def get_query_executor( |
| 133 | self, *, transactional: bool = False |
| 134 | ) -> AsyncIterator[QueryExecutor]: |
| 135 | """ |
| 136 | Get connection, reusing transaction connection if in transaction. |
| 137 | |
| 138 | :param transactional: If True, open an explicit transaction for the |
| 139 | scope of the executor. Use for server-side cursors (``iterate``) |
| 140 | and multi-row ``execute_many`` batches, which need a real |
| 141 | transaction either because AUTOCOMMIT would commit each row |
| 142 | separately or because the driver (asyncpg) requires a |
| 143 | transaction for streaming. |
| 144 | :type transactional: bool |
| 145 | :return: QueryExecutor wrapping a connection |
| 146 | :rtype: QueryExecutor |
| 147 | """ |
| 148 | trans_conn = self.get_transaction_connection() |
| 149 | if trans_conn is not None: |
| 150 | # Inside a transaction - reuse the transaction's connection |
| 151 | yield QueryExecutor(trans_conn) |
| 152 | elif transactional: |
| 153 | async with self.transaction(): |
| 154 | conn = self.get_transaction_connection() |
| 155 | assert conn is not None |
| 156 | yield QueryExecutor(conn) |
| 157 | else: |
| 158 | # Outside transaction: use AUTOCOMMIT view so each statement |
| 159 | # commits at the driver level with no extra BEGIN/COMMIT |
| 160 | # round-trip. |
| 161 | assert self._autocommit_engine is not None |
| 162 | async with self._autocommit_engine.connect() as conn: |
| 163 | yield QueryExecutor(conn) |
| 164 | |
| 165 | def get_transaction_connection(self) -> Optional[AsyncConnection]: |
| 166 | """Get the current transaction connection if in a transaction.""" |
no test coverage detected