合并本地文件系统的分片文件并返回文件路径和完整哈希值 :param upload_id: 上传会话ID :param chunk_info: 分片信息 :param save_path: 文件保存路径 :return: (文件路径, 文件哈希值)
(self, upload_id: str, chunk_info: UploadChunk, save_path: str)
| 188 | raise e |
| 189 | |
| 190 | async def merge_chunks(self, upload_id: str, chunk_info: UploadChunk, save_path: str) -> tuple[str, str]: |
| 191 | """ |
| 192 | 合并本地文件系统的分片文件并返回文件路径和完整哈希值 |
| 193 | :param upload_id: 上传会话ID |
| 194 | :param chunk_info: 分片信息 |
| 195 | :param save_path: 文件保存路径 |
| 196 | :return: (文件路径, 文件哈希值) |
| 197 | """ |
| 198 | output_path = self.root_path / save_path |
| 199 | output_path.parent.mkdir(parents=True, exist_ok=True) |
| 200 | chunk_base_dir = output_path.parent / 'chunks' / upload_id |
| 201 | file_sha256 = hashlib.sha256() |
| 202 | |
| 203 | # 使用临时文件写入,确保原子性 |
| 204 | temp_output = output_path.with_suffix('.merging') |
| 205 | try: |
| 206 | async with aiofiles.open(temp_output, "wb") as out_file: |
| 207 | for i in range(chunk_info.total_chunks): |
| 208 | # 获取分片记录 |
| 209 | chunk_record = await UploadChunk.filter(upload_id=upload_id, chunk_index=i).first() |
| 210 | if not chunk_record: |
| 211 | raise ValueError(f"分片{i}记录不存在") |
| 212 | chunk_path = chunk_base_dir / f"{i}.part" |
| 213 | if not chunk_path.exists(): |
| 214 | raise ValueError(f"分片{i}文件不存在") |
| 215 | async with aiofiles.open(chunk_path, "rb") as in_file: |
| 216 | chunk_data = await in_file.read() |
| 217 | current_hash = hashlib.sha256(chunk_data).hexdigest() |
| 218 | if current_hash != chunk_record.chunk_hash: |
| 219 | raise ValueError(f"分片{i}哈希不匹配: 期望 {chunk_record.chunk_hash}, 实际 {current_hash}") |
| 220 | file_sha256.update(chunk_data) |
| 221 | await out_file.write(chunk_data) |
| 222 | # 原子重命名 |
| 223 | temp_output.rename(output_path) |
| 224 | except Exception as e: |
| 225 | if temp_output.exists(): |
| 226 | temp_output.unlink() |
| 227 | raise e |
| 228 | return str(output_path), file_sha256.hexdigest() |
| 229 | |
| 230 | async def clean_chunks(self, upload_id: str, save_path: str): |
| 231 | """ |