(self, query: str, values: list)
| 123 | |
| 124 | @translate_exceptions |
| 125 | async def execute_many(self, query: str, values: list) -> None: |
| 126 | async with self.acquire_connection() as connection: |
| 127 | self.log.debug("%s: %s", query, values) |
| 128 | async with connection.cursor() as cursor: |
| 129 | try: |
| 130 | await cursor.executemany(query, values) |
| 131 | except Exception: |
| 132 | await cursor.rollback() |
| 133 | raise |
| 134 | else: |
| 135 | await cursor.commit() |
| 136 | |
| 137 | @translate_exceptions |
| 138 | async def execute_query(self, query: str, values: list | None = None) -> tuple[int, list[dict]]: |
no test coverage detected