Class to manage acquiring from and releasing connections to a pool.
| 384 | |
| 385 | |
| 386 | class PoolConnectionWrapper(Generic[T_conn]): |
| 387 | """Class to manage acquiring from and releasing connections to a pool.""" |
| 388 | |
| 389 | __slots__ = ("client", "connection", "_pool_init_lock") |
| 390 | |
| 391 | def __init__(self, client: BaseDBAsyncClient, pool_init_lock: asyncio.Lock) -> None: |
| 392 | self.client = client |
| 393 | self.connection: T_conn | None = None |
| 394 | self._pool_init_lock = pool_init_lock |
| 395 | |
| 396 | async def ensure_connection(self) -> None: |
| 397 | if not self.client._pool: |
| 398 | # a safeguard against multiple concurrent tasks trying to initialize the pool |
| 399 | async with self._pool_init_lock: |
| 400 | if not self.client._pool: |
| 401 | await self.client.create_connection(with_db=True) |
| 402 | |
| 403 | async def __aenter__(self) -> T_conn: |
| 404 | await self.ensure_connection() |
| 405 | # get first available connection. If none available, wait until one is released |
| 406 | self.connection = await self.client._pool.acquire() |
| 407 | return cast(T_conn, self.connection) |
| 408 | |
| 409 | async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: |
| 410 | # release the connection back to the pool |
| 411 | await self.client._pool.release(self.connection) |
no outgoing calls
no test coverage detected
searching dependent graphs…