MCPcopy Index your code
hub / github.com/vastsa/FileCodeBox / _mkdir_p

Method _mkdir_p

core/storage.py:1028–1049  ·  view source on GitHub ↗

递归创建目录(类似mkdir -p)

(self, directory_path: str)

Source from the content-addressed store, hash-verified

1026 return f"{self.base_url}{encoded_path}"
1027
1028 async def _mkdir_p(self, directory_path: str):
1029 """递归创建目录(类似mkdir -p)"""
1030 path_obj = Path(unquote(directory_path))
1031 current_path = ""
1032
1033 async with aiohttp.ClientSession(auth=self.auth) as session:
1034 # 逐级检查目录是否存在
1035 for part in path_obj.parts:
1036 current_path = str(Path(current_path) / part)
1037 url = self._build_url(current_path)
1038
1039 # 检查目录是否存在
1040 async with session.head(url) as resp:
1041 if resp.status == 404:
1042 # 创建目录
1043 async with session.request("MKCOL", url) as mkcol_resp:
1044 if mkcol_resp.status not in (200, 201, 409):
1045 content = await mkcol_resp.text()
1046 raise HTTPException(
1047 status_code=mkcol_resp.status,
1048 detail=f"目录创建失败: {content[:200]}",
1049 )
1050
1051 async def _is_dir_empty(self, dir_path: str) -> bool:
1052 """检查目录是否为空"""

Callers 3

save_fileMethod · 0.95
save_chunkMethod · 0.95
merge_chunksMethod · 0.95

Calls 1

_build_urlMethod · 0.95

Tested by

no test coverage detected