Execute SQL statements using the schema editor.
(self, state_editor: BaseSchemaEditor, sqls)
| 1254 | await self._run_sql(state_editor, self.reverse_sql) |
| 1255 | |
| 1256 | async def _run_sql(self, state_editor: BaseSchemaEditor, sqls) -> None: |
| 1257 | """Execute SQL statements using the schema editor.""" |
| 1258 | if isinstance(sqls, (list, tuple)): |
| 1259 | for sql in sqls: |
| 1260 | params = None |
| 1261 | if isinstance(sql, (list, tuple)): |
| 1262 | elements = len(sql) |
| 1263 | if elements == 2: |
| 1264 | sql, params = sql |
| 1265 | else: |
| 1266 | raise ValueError(f"Expected a 2-tuple but got {elements}") |
| 1267 | |
| 1268 | if params: |
| 1269 | if state_editor.collect_sql: |
| 1270 | state_editor.collected_sql.append(f"{sql} -- params: {params!r}") |
| 1271 | else: |
| 1272 | await state_editor.client.execute_query(sql, params) |
| 1273 | else: |
| 1274 | await state_editor._run_sql(sql) |
| 1275 | elif sqls != RunSQL.noop: |
| 1276 | await state_editor._run_sql(sqls) |
no test coverage detected