(
self,
model: type[Model],
db: BaseDBAsyncClient,
prefetch_map: dict[str, set[str | Prefetch]] | None = None,
prefetch_queries: dict[str, list[tuple[str | None, QuerySet]]] | None = None,
select_related_idx: (
list[tuple[type[Model], int, str, type[Model], Iterable[str | None]]] | None
) = None,
)
| 42 | DB_NATIVE = {bytes, str, int, float, decimal.Decimal, datetime.datetime, datetime.date} |
| 43 | |
| 44 | def __init__( |
| 45 | self, |
| 46 | model: type[Model], |
| 47 | db: BaseDBAsyncClient, |
| 48 | prefetch_map: dict[str, set[str | Prefetch]] | None = None, |
| 49 | prefetch_queries: dict[str, list[tuple[str | None, QuerySet]]] | None = None, |
| 50 | select_related_idx: ( |
| 51 | list[tuple[type[Model], int, str, type[Model], Iterable[str | None]]] | None |
| 52 | ) = None, |
| 53 | ) -> None: |
| 54 | self.model = model |
| 55 | self.db: BaseDBAsyncClient = db |
| 56 | self.prefetch_map = prefetch_map or {} |
| 57 | self._prefetch_queries = prefetch_queries or {} |
| 58 | self.select_related_idx = select_related_idx |
| 59 | key = (self.db.connection_name, self.model._meta.schema, self.model._meta.db_table) |
| 60 | if key not in EXECUTOR_CACHE: |
| 61 | self.regular_columns, columns = self._prepare_insert_columns() |
| 62 | self.insert_query = str(self._prepare_insert_statement(columns)) |
| 63 | self.regular_columns_all = self.regular_columns |
| 64 | self.insert_query_all = self.insert_query |
| 65 | if self.model._meta.generated_db_fields: |
| 66 | self.regular_columns_all, columns_all = self._prepare_insert_columns( |
| 67 | include_generated=True |
| 68 | ) |
| 69 | self.insert_query_all = str( |
| 70 | self._prepare_insert_statement(columns_all, has_generated=False) |
| 71 | ) |
| 72 | |
| 73 | table = self.model._meta.basetable |
| 74 | basequery = cast(QueryBuilder, self.model._meta.basequery) |
| 75 | self.delete_query = str( |
| 76 | basequery.where(table[self.model._meta.db_pk_column] == self.parameter(0)).delete() |
| 77 | ) |
| 78 | self.update_cache: dict[str, str] = {} |
| 79 | |
| 80 | EXECUTOR_CACHE[key] = ( |
| 81 | self.regular_columns, |
| 82 | self.insert_query, |
| 83 | self.regular_columns_all, |
| 84 | self.insert_query_all, |
| 85 | self.delete_query, |
| 86 | self.update_cache, |
| 87 | ) |
| 88 | |
| 89 | else: |
| 90 | ( |
| 91 | self.regular_columns, |
| 92 | self.insert_query, |
| 93 | self.regular_columns_all, |
| 94 | self.insert_query_all, |
| 95 | self.delete_query, |
| 96 | self.update_cache, |
| 97 | ) = EXECUTOR_CACHE[key] |
| 98 | |
| 99 | async def execute_explain(self, sql: str) -> Any: |
| 100 | sql = " ".join((self.EXPLAIN_PREFIX, sql)) |
nothing calls this directly
no test coverage detected