MCPcopy Index your code
hub / github.com/vastsa/FileCodeBox / save_chunk

Method save_chunk

core/storage.py:1197–1213  ·  view source on GitHub ↗

保存分片到 WebDAV

(self, upload_id: str, chunk_index: int, chunk_data: bytes, chunk_hash: str, save_path: str)

Source from the content-addressed store, hash-verified

1195 status_code=503, detail=f"WebDAV连接异常: {str(e)}")
1196
1197 async def save_chunk(self, upload_id: str, chunk_index: int, chunk_data: bytes, chunk_hash: str, save_path: str):
1198 """保存分片到 WebDAV"""
1199 chunk_dir = str(Path(save_path).parent / "chunks" / upload_id)
1200 chunk_path = f"{chunk_dir}/{chunk_index}.part"
1201
1202 # 先创建目录结构
1203 await self._mkdir_p(chunk_dir)
1204
1205 chunk_url = self._build_url(chunk_path)
1206 async with aiohttp.ClientSession(auth=self.auth) as session:
1207 async with session.put(chunk_url, data=chunk_data) as resp:
1208 if resp.status not in (200, 201, 204):
1209 content = await resp.text()
1210 raise HTTPException(
1211 status_code=resp.status,
1212 detail=f"分片上传失败: {content[:200]}"
1213 )
1214
1215 async def merge_chunks(self, upload_id: str, chunk_info: UploadChunk, save_path: str) -> tuple[str, str]:
1216 """

Callers

nothing calls this directly

Calls 2

_mkdir_pMethod · 0.95
_build_urlMethod · 0.95

Tested by

no test coverage detected