(self, file_code: FileCodes)
| 314 | ) |
| 315 | |
| 316 | async def get_file_response(self, file_code: FileCodes): |
| 317 | try: |
| 318 | filename = file_code.prefix + file_code.suffix |
| 319 | content_length = None # 初始化为 None,表示未知大小 |
| 320 | |
| 321 | async with self._client() as s3: |
| 322 | # 尝试获取文件大小(HEAD请求) |
| 323 | try: |
| 324 | head_response = await s3.head_object( |
| 325 | Bucket=self.bucket_name, |
| 326 | Key=await file_code.get_file_path() |
| 327 | ) |
| 328 | # 从HEAD响应中获取Content-Length |
| 329 | if 'ContentLength' in head_response: |
| 330 | content_length = head_response['ContentLength'] |
| 331 | elif 'Content-Length' in head_response['ResponseMetadata']['HTTPHeaders']: |
| 332 | content_length = int(head_response['ResponseMetadata']['HTTPHeaders']['Content-Length']) |
| 333 | except Exception: |
| 334 | # 如果HEAD请求失败,则不提供 Content-Length |
| 335 | pass |
| 336 | |
| 337 | link = await s3.generate_presigned_url( |
| 338 | "get_object", |
| 339 | Params={ |
| 340 | "Bucket": self.bucket_name, |
| 341 | "Key": await file_code.get_file_path(), |
| 342 | }, |
| 343 | ExpiresIn=3600, |
| 344 | ) |
| 345 | |
| 346 | # 创建ClientSession并传递给生成器复用 |
| 347 | session = aiohttp.ClientSession() |
| 348 | |
| 349 | async def stream_generator(): |
| 350 | try: |
| 351 | async with session.get(link) as resp: |
| 352 | if resp.status != 200: |
| 353 | raise HTTPException( |
| 354 | status_code=resp.status, |
| 355 | detail=f"从S3获取文件失败: {resp.status}" |
| 356 | ) |
| 357 | # 设置块大小(例如64KB) |
| 358 | chunk_size = 65536 |
| 359 | while True: |
| 360 | chunk = await resp.content.read(chunk_size) |
| 361 | if not chunk: |
| 362 | break |
| 363 | yield chunk |
| 364 | finally: |
| 365 | await session.close() |
| 366 | |
| 367 | from fastapi.responses import StreamingResponse |
| 368 | encoded_filename = quote(filename, safe='') |
| 369 | headers = { |
| 370 | "Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}" |
| 371 | } |
| 372 | if content_length is not None: |
| 373 | headers["Content-Length"] = str(content_length) |
no test coverage detected