MCPcopy Index your code
hub / github.com/vastsa/FileCodeBox / file_exists

Method file_exists

core/storage.py:533–563  ·  view source on GitHub ↗

检查文件是否存在于S3 :param save_path: 文件路径 :return: 文件是否存在

(self, save_path: str)

Source from the content-addressed store, hash-verified

531 )
532
533 async def file_exists(self, save_path: str) -> bool:
534 """
535 检查文件是否存在于S3
536 :param save_path: 文件路径
537 :return: 文件是否存在
538 """
539 async with self._client() as s3:
540 last_error = None
541 for attempt in range(3):
542 try:
543 await s3.head_object(Bucket=self.bucket_name, Key=save_path)
544 return True
545 except Exception as e:
546 last_error = e
547 if attempt < 2:
548 await asyncio.sleep(0.2 * (attempt + 1))
549
550 try:
551 result = await s3.list_objects_v2(
552 Bucket=self.bucket_name,
553 Prefix=save_path,
554 MaxKeys=1,
555 )
556 for item in result.get("Contents", []):
557 if item.get("Key") == save_path:
558 return True
559 except Exception as e:
560 last_error = e
561
562 logger.warning(f"S3文件确认失败 key={save_path}: {last_error}")
563 return False
564
565
566class OneDriveFileStorage(FileStorageInterface):

Calls 3

_clientMethod · 0.95
head_objectMethod · 0.80
list_objects_v2Method · 0.80