合并 OpenDAL 存储上的分片文件,使用临时文件避免内存问题
(self, upload_id: str, chunk_info: UploadChunk, save_path: str)
| 948 | await self.operator.write(chunk_path, chunk_data) |
| 949 | |
| 950 | async def merge_chunks(self, upload_id: str, chunk_info: UploadChunk, save_path: str) -> tuple[str, str]: |
| 951 | """合并 OpenDAL 存储上的分片文件,使用临时文件避免内存问题""" |
| 952 | file_sha256 = hashlib.sha256() |
| 953 | chunk_dir = str(Path(save_path).parent / "chunks" / upload_id) |
| 954 | |
| 955 | # 使用临时文件存储合并数据 |
| 956 | with tempfile.NamedTemporaryFile(delete=False) as temp_file: |
| 957 | temp_path = temp_file.name |
| 958 | |
| 959 | try: |
| 960 | async with aiofiles.open(temp_path, 'wb') as out_file: |
| 961 | for i in range(chunk_info.total_chunks): |
| 962 | chunk_path = f"{chunk_dir}/{i}.part" |
| 963 | chunk_record = await UploadChunk.filter(upload_id=upload_id, chunk_index=i).first() |
| 964 | if not chunk_record: |
| 965 | raise ValueError(f"分片{i}记录不存在") |
| 966 | |
| 967 | try: |
| 968 | chunk_data = await self.operator.read(chunk_path) |
| 969 | except Exception as e: |
| 970 | raise ValueError(f"分片{i}文件不存在: {e}") |
| 971 | |
| 972 | current_hash = hashlib.sha256(chunk_data).hexdigest() |
| 973 | if current_hash != chunk_record.chunk_hash: |
| 974 | raise ValueError(f"分片{i}哈希不匹配: 期望 {chunk_record.chunk_hash}, 实际 {current_hash}") |
| 975 | |
| 976 | file_sha256.update(chunk_data) |
| 977 | await out_file.write(chunk_data) |
| 978 | del chunk_data # 释放内存 |
| 979 | |
| 980 | # 读取临时文件并写入存储 |
| 981 | async with aiofiles.open(temp_path, 'rb') as f: |
| 982 | merged_content = await f.read() |
| 983 | await self.operator.write(save_path, merged_content) |
| 984 | finally: |
| 985 | # 清理临时文件 |
| 986 | if os.path.exists(temp_path): |
| 987 | os.unlink(temp_path) |
| 988 | |
| 989 | return save_path, file_sha256.hexdigest() |
| 990 | |
| 991 | async def clean_chunks(self, upload_id: str, save_path: str): |
| 992 | """清理 OpenDAL 存储上的临时分片文件""" |