(
self,
instances: tuple[MODEL, ...] | None = None,
using_db: BaseDBAsyncClient | None = None,
)
| 236 | await self._remove_or_clear(instances, using_db) |
| 237 | |
| 238 | async def _remove_or_clear( |
| 239 | self, |
| 240 | instances: tuple[MODEL, ...] | None = None, |
| 241 | using_db: BaseDBAsyncClient | None = None, |
| 242 | ) -> None: |
| 243 | db = using_db or self.remote_model._meta.db |
| 244 | through_table = Table(self.field.through, schema=self.field.through_schema) |
| 245 | pk_formatting_func = type(self.instance)._meta.pk.to_db_value |
| 246 | |
| 247 | condition = through_table[self.field.backward_key] == pk_formatting_func( |
| 248 | self.instance.pk, self.instance |
| 249 | ) |
| 250 | if instances: |
| 251 | related_pk_formatting_func = type(instances[0])._meta.pk.to_db_value |
| 252 | if len(instances) == 1: |
| 253 | condition &= through_table[self.field.forward_key] == related_pk_formatting_func( |
| 254 | instances[0].pk, instances[0] |
| 255 | ) |
| 256 | else: |
| 257 | condition &= through_table[self.field.forward_key].isin( |
| 258 | [related_pk_formatting_func(i.pk, i) for i in instances] |
| 259 | ) |
| 260 | query = db.query_class.from_(through_table).where(condition).delete() |
| 261 | await db.execute_query(*query.get_parameterized_sql()) |
| 262 | |
| 263 | |
| 264 | class RelationalField(Field[MODEL]): |
no test coverage detected