(
self,
sql: str,
values: list | None = None,
custom_fields: list | None = None,
)
| 101 | return (await self.db.execute_query(sql))[1] |
| 102 | |
| 103 | async def execute_select( |
| 104 | self, |
| 105 | sql: str, |
| 106 | values: list | None = None, |
| 107 | custom_fields: list | None = None, |
| 108 | ) -> list: |
| 109 | _, raw_results = await self.db.execute_query(sql, values) |
| 110 | instance_list = [] |
| 111 | if self.select_related_idx: |
| 112 | _split_cache: dict[str, str] = {} |
| 113 | for row_idx, row in enumerate(raw_results): |
| 114 | if row_idx != 0 and row_idx % CHUNK_SIZE == 0: |
| 115 | # Forcibly yield to the event loop to avoid blocking the event loop |
| 116 | # when selecting a large number of rows |
| 117 | await asyncio.sleep(0) |
| 118 | |
| 119 | if self.select_related_idx: |
| 120 | _, current_idx, _, _, path = self.select_related_idx[0] |
| 121 | row_items = list(dict(row).items()) |
| 122 | instance: Model = self.model._init_from_db(**dict(row_items[:current_idx])) |
| 123 | instances: dict[Any, Any] = {path: instance} |
| 124 | for model, index, *__, full_path in self.select_related_idx[1:]: |
| 125 | (*path, attr) = full_path |
| 126 | related_items = row_items[current_idx : current_idx + index] |
| 127 | if any(v for _, v in related_items): |
| 128 | related_kwargs = {} |
| 129 | for k, v in related_items: |
| 130 | fname = _split_cache.get(k) |
| 131 | if fname is None: |
| 132 | fname = _split_cache[k] = k.split(".", 1)[1] |
| 133 | related_kwargs[fname] = v |
| 134 | obj = model._init_from_db(**related_kwargs) |
| 135 | elif index == 0: |
| 136 | # 0 signals that an empty "filler" object should be created in the case |
| 137 | # where a field of related model is selected but model itself isn't, |
| 138 | # e.g. .only("relatedmodel__field") |
| 139 | obj = model._init_from_db() |
| 140 | else: |
| 141 | obj = None |
| 142 | target = instances.get(tuple(path)) |
| 143 | if target is not None: |
| 144 | object.__setattr__(target, f"_{attr}", obj) |
| 145 | if obj is not None: |
| 146 | instances[(*path, attr)] = obj |
| 147 | current_idx += index |
| 148 | else: |
| 149 | instance = self.model._init_from_db(**row) |
| 150 | if custom_fields: |
| 151 | for field in custom_fields: |
| 152 | object.__setattr__(instance, field, row[field]) |
| 153 | instance_list.append(instance) |
| 154 | await self._execute_prefetch_queries(instance_list) |
| 155 | return instance_list |
| 156 | |
| 157 | def _prepare_insert_columns( |
| 158 | self, include_generated: bool = False |
no test coverage detected