MCPcopy
hub / github.com/sansan0/TrendRadar / _create_remote_backend

Method _create_remote_backend

trendradar/storage/manager.py:127–148  ·  view source on GitHub ↗

创建远程存储后端

(self)

Source from the content-addressed store, hash-verified

125 return has_config
126
127 def _create_remote_backend(self) -> Optional[StorageBackend]:
128 """创建远程存储后端"""
129 try:
130 from trendradar.storage.remote import RemoteStorageBackend
131
132 return RemoteStorageBackend(
133 bucket_name=self.remote_config.get("bucket_name") or os.environ.get("S3_BUCKET_NAME", ""),
134 access_key_id=self.remote_config.get("access_key_id") or os.environ.get("S3_ACCESS_KEY_ID", ""),
135 secret_access_key=self.remote_config.get("secret_access_key") or os.environ.get("S3_SECRET_ACCESS_KEY", ""),
136 endpoint_url=self.remote_config.get("endpoint_url") or os.environ.get("S3_ENDPOINT_URL", ""),
137 region=self.remote_config.get("region") or os.environ.get("S3_REGION", ""),
138 enable_txt=self.enable_txt,
139 enable_html=self.enable_html,
140 timezone=self.timezone,
141 )
142 except ImportError as e:
143 print(f"[存储管理器] 远程后端导入失败: {e}")
144 print("[存储管理器] 请确保已安装 boto3: pip install boto3")
145 return None
146 except Exception as e:
147 print(f"[存储管理器] 远程后端初始化失败: {e}")
148 return None
149
150 def get_backend(self) -> StorageBackend:
151 """获取存储后端实例"""

Callers 3

get_backendMethod · 0.95
pull_from_remoteMethod · 0.95
cleanup_old_dataMethod · 0.95

Calls 2

getMethod · 0.80

Tested by

no test coverage detected