| 16 | |
| 17 | |
| 18 | class SqliteSchemaEditor(SqliteQuotingMixin, BaseSchemaEditor): |
| 19 | DIALECT = "sqlite" |
| 20 | DELETE_TABLE_TEMPLATE = "DROP TABLE {table}" |
| 21 | DELETE_FIELD_TEMPLATE = 'ALTER TABLE {table} DROP COLUMN "{column}"' |
| 22 | DROP_INDEX_TEMPLATE = 'DROP INDEX "{name}"' |
| 23 | RENAME_INDEX_TEMPLATE = None |
| 24 | |
| 25 | @classmethod |
| 26 | def _get_escape_translation_table(cls) -> list[str]: |
| 27 | table = super()._get_escape_translation_table() |
| 28 | table[ord('"')] = '"' |
| 29 | table[ord("'")] = "'" |
| 30 | table[ord("/")] = "\\/" |
| 31 | return table |
| 32 | |
| 33 | async def _run_sql(self, sql: str) -> None: |
| 34 | """Execute DDL SQL on SQLite. |
| 35 | |
| 36 | In atomic mode, uses execute_query() per statement because |
| 37 | sqlite3.executescript() issues an implicit COMMIT. |
| 38 | |
| 39 | In collect_sql mode, delegates to the base class to collect |
| 40 | the SQL without splitting or executing. |
| 41 | """ |
| 42 | if self.collect_sql: |
| 43 | await super()._run_sql(sql) |
| 44 | return |
| 45 | if self.atomic_migration: |
| 46 | for statement in sql.split(";"): |
| 47 | statement = statement.strip() |
| 48 | if statement: |
| 49 | await self.client.execute_query(statement) |
| 50 | else: |
| 51 | await self.client.execute_script(sql) |
| 52 | |
| 53 | def _get_table_comment_sql(self, table: str, comment: str) -> str: |
| 54 | return f" /* {self._escape_comment(comment)} */" |
| 55 | |
| 56 | def _get_column_comment_sql(self, table: str, column: str, comment: str) -> str: |
| 57 | return f" /* {self._escape_comment(comment)} */" |
| 58 | |
| 59 | async def add_field(self, model, field_name: str) -> None: |
| 60 | field = model._meta.fields_map[field_name] |
| 61 | if isinstance(field, ManyToManyFieldInstance): |
| 62 | table_string = self._get_m2m_table_definition(model, field) |
| 63 | if table_string: |
| 64 | await self._run_sql(table_string) |
| 65 | return |
| 66 | qualified_table = self._qualify_table_name(model._meta.db_table, model._meta.schema) |
| 67 | if isinstance(field, ForeignKeyFieldInstance): |
| 68 | key_field_name = field.source_field or field_name |
| 69 | db_field = model._meta.fields_db_projection.get(key_field_name, key_field_name) |
| 70 | key_field = model._meta.fields_map[key_field_name] |
| 71 | fk_field = cast(ForeignKeyFieldInstance, key_field.reference) |
| 72 | comment = ( |
| 73 | self._get_column_comment_sql( |
| 74 | table=qualified_table, |
| 75 | column=db_field, |
no outgoing calls
searching dependent graphs…