(self, file_ids: list[int])
| 101 | ) |
| 102 | |
| 103 | async def delete_files(self, file_ids: list[int]): |
| 104 | unique_ids = list(dict.fromkeys(file_ids)) |
| 105 | deleted = [] |
| 106 | failed = [] |
| 107 | missing = [] |
| 108 | |
| 109 | for file_id in unique_ids: |
| 110 | file_code = await FileCodes.filter(id=file_id).first() |
| 111 | if not file_code: |
| 112 | missing.append(file_id) |
| 113 | continue |
| 114 | |
| 115 | try: |
| 116 | await self._delete_file_code(file_code) |
| 117 | deleted.append(file_id) |
| 118 | except Exception as exc: |
| 119 | failed.append({"id": file_id, "reason": str(exc)}) |
| 120 | |
| 121 | if deleted: |
| 122 | await self.record_admin_activity( |
| 123 | action="files.batch_delete", |
| 124 | target_type="file", |
| 125 | count=len(deleted), |
| 126 | meta={ |
| 127 | "requestedCount": len(file_ids), |
| 128 | "uniqueCount": len(unique_ids), |
| 129 | "deleted": deleted, |
| 130 | "missing": missing, |
| 131 | "failedCount": len(failed), |
| 132 | }, |
| 133 | ) |
| 134 | |
| 135 | return { |
| 136 | "requestedCount": len(file_ids), |
| 137 | "requested_count": len(file_ids), |
| 138 | "uniqueCount": len(unique_ids), |
| 139 | "unique_count": len(unique_ids), |
| 140 | "deletedCount": len(deleted), |
| 141 | "deleted_count": len(deleted), |
| 142 | "missingCount": len(missing), |
| 143 | "missing_count": len(missing), |
| 144 | "failedCount": len(failed), |
| 145 | "failed_count": len(failed), |
| 146 | "deleted": deleted, |
| 147 | "missing": missing, |
| 148 | "failed": failed, |
| 149 | } |
| 150 | |
| 151 | async def update_files(self, file_ids: list[int], update_data: dict[str, Any]): |
| 152 | unique_ids = list(dict.fromkeys(file_ids)) |
no test coverage detected