(self, file_ids: list[int], update_data: dict[str, Any])
| 149 | } |
| 150 | |
| 151 | async def update_files(self, file_ids: list[int], update_data: dict[str, Any]): |
| 152 | unique_ids = list(dict.fromkeys(file_ids)) |
| 153 | updated = [] |
| 154 | failed = [] |
| 155 | missing = [] |
| 156 | |
| 157 | for file_id in unique_ids: |
| 158 | file_code = await FileCodes.filter(id=file_id).first() |
| 159 | if not file_code: |
| 160 | missing.append(file_id) |
| 161 | continue |
| 162 | |
| 163 | try: |
| 164 | await file_code.update_from_dict(update_data).save() |
| 165 | updated.append(file_id) |
| 166 | except Exception as exc: |
| 167 | failed.append({"id": file_id, "reason": str(exc)}) |
| 168 | |
| 169 | if updated: |
| 170 | await self.record_admin_activity( |
| 171 | action="files.batch_update", |
| 172 | target_type="file", |
| 173 | count=len(updated), |
| 174 | meta={ |
| 175 | "fields": sorted(update_data.keys()), |
| 176 | "requestedCount": len(file_ids), |
| 177 | "uniqueCount": len(unique_ids), |
| 178 | "updated": updated, |
| 179 | "missing": missing, |
| 180 | "failedCount": len(failed), |
| 181 | }, |
| 182 | ) |
| 183 | |
| 184 | return { |
| 185 | "requestedCount": len(file_ids), |
| 186 | "requested_count": len(file_ids), |
| 187 | "uniqueCount": len(unique_ids), |
| 188 | "unique_count": len(unique_ids), |
| 189 | "updatedCount": len(updated), |
| 190 | "updated_count": len(updated), |
| 191 | "missingCount": len(missing), |
| 192 | "missing_count": len(missing), |
| 193 | "failedCount": len(failed), |
| 194 | "failed_count": len(failed), |
| 195 | "updated": updated, |
| 196 | "missing": missing, |
| 197 | "failed": failed, |
| 198 | } |
| 199 | |
| 200 | async def apply_file_policy_action( |
| 201 | self, |
no test coverage detected