(
self,
file_id: int,
action: str,
download_limit: Optional[int] = None,
)
| 198 | } |
| 199 | |
| 200 | async def apply_file_policy_action( |
| 201 | self, |
| 202 | file_id: int, |
| 203 | action: str, |
| 204 | download_limit: Optional[int] = None, |
| 205 | ) -> dict[str, Any]: |
| 206 | file_code = await FileCodes.filter(id=file_id).first() |
| 207 | if not file_code: |
| 208 | raise HTTPException(status_code=404, detail="文件不存在") |
| 209 | |
| 210 | action = action.strip().lower() |
| 211 | now = await get_now() |
| 212 | update_data = self._build_policy_action_update( |
| 213 | file_code=file_code, |
| 214 | action=action, |
| 215 | now=now, |
| 216 | download_limit=download_limit, |
| 217 | ) |
| 218 | |
| 219 | await file_code.update_from_dict(update_data).save() |
| 220 | await self.record_admin_activity( |
| 221 | action="file.policy_action", |
| 222 | target_type="file", |
| 223 | target_id=file_id, |
| 224 | target_name=self._build_file_activity_name(file_code), |
| 225 | count=1, |
| 226 | meta={"policyAction": action}, |
| 227 | ) |
| 228 | return await self.get_file_detail(file_id) |
| 229 | |
| 230 | async def get_file_metadata(self, file_id: int) -> dict[str, Any]: |
| 231 | record = await KeyValue.filter(key=self._file_metadata_key(file_id)).first() |
no test coverage detected