(self, file_code: FileCodes)
| 139 | return await get_file_url(file_code.code) |
| 140 | |
| 141 | async def get_file_response(self, file_code: FileCodes): |
| 142 | file_path = self.root_path / await file_code.get_file_path() |
| 143 | if not file_path.exists(): |
| 144 | return APIResponse(code=404, detail="文件已过期删除") |
| 145 | filename = f"{file_code.prefix}{file_code.suffix}" |
| 146 | encoded_filename = quote(filename, safe='') |
| 147 | content_disposition = f"attachment; filename*=UTF-8''{encoded_filename}" |
| 148 | |
| 149 | # 尝试获取文件系统大小,如果成功则设置 Content-Length |
| 150 | headers = {"Content-Disposition": content_disposition} |
| 151 | try: |
| 152 | content_length = file_path.stat().st_size |
| 153 | headers["Content-Length"] = str(content_length) |
| 154 | except Exception: |
| 155 | # 如果获取文件大小失败,则不提供 Content-Length |
| 156 | pass |
| 157 | |
| 158 | return FileResponse( |
| 159 | file_path, |
| 160 | media_type="application/octet-stream", |
| 161 | headers=headers, |
| 162 | filename=filename # 保留原始文件名以备某些场景使用 |
| 163 | ) |
| 164 | |
| 165 | async def save_chunk(self, upload_id: str, chunk_index: int, chunk_data: bytes, chunk_hash: str, save_path: str): |
| 166 | """ |
nothing calls this directly
no test coverage detected