MCPcopy
hub / github.com/tortoise/tortoise-orm / execute_query

Method execute_query

tortoise/backends/odbc/client.py:138–155  ·  view source on GitHub ↗
(self, query: str, values: list | None = None)

Source from the content-addressed store, hash-verified

136
137 @translate_exceptions
138 async def execute_query(self, query: str, values: list | None = None) -> tuple[int, list[dict]]:
139 async with self.acquire_connection() as connection:
140 self.log.debug("%s: %s", query, values)
141 async with connection.cursor() as cursor:
142 if values:
143 await cursor.execute(query, values)
144 else:
145 await cursor.execute(query)
146 if query.startswith("UPDATE") or query.startswith("DELETE"):
147 return cursor.rowcount, []
148 try:
149 rows = await cursor.fetchall()
150 except pyodbc.ProgrammingError:
151 return cursor.rowcount, []
152 if rows:
153 fields = [c[0] for c in cursor.description]
154 return cursor.rowcount, [dict(zip(fields, row)) for row in rows]
155 return cursor.rowcount, []
156
157 async def execute_query_dict(self, query: str, values: list | None = None) -> list[dict]:
158 return (await self.execute_query(query, values))[1]

Callers 15

execute_query_dictMethod · 0.95
_executeMethod · 0.45
_executeMethod · 0.45
_executeMethod · 0.45
_executeMethod · 0.45
_executeMethod · 0.45
_execute_manyMethod · 0.45
_run_sqlMethod · 0.45
applied_migrationsMethod · 0.45

Calls 1

acquire_connectionMethod · 0.95

Tested by

no test coverage detected