创建远程存储后端
(self)
| 125 | return has_config |
| 126 | |
| 127 | def _create_remote_backend(self) -> Optional[StorageBackend]: |
| 128 | """创建远程存储后端""" |
| 129 | try: |
| 130 | from trendradar.storage.remote import RemoteStorageBackend |
| 131 | |
| 132 | return RemoteStorageBackend( |
| 133 | bucket_name=self.remote_config.get("bucket_name") or os.environ.get("S3_BUCKET_NAME", ""), |
| 134 | access_key_id=self.remote_config.get("access_key_id") or os.environ.get("S3_ACCESS_KEY_ID", ""), |
| 135 | secret_access_key=self.remote_config.get("secret_access_key") or os.environ.get("S3_SECRET_ACCESS_KEY", ""), |
| 136 | endpoint_url=self.remote_config.get("endpoint_url") or os.environ.get("S3_ENDPOINT_URL", ""), |
| 137 | region=self.remote_config.get("region") or os.environ.get("S3_REGION", ""), |
| 138 | enable_txt=self.enable_txt, |
| 139 | enable_html=self.enable_html, |
| 140 | timezone=self.timezone, |
| 141 | ) |
| 142 | except ImportError as e: |
| 143 | print(f"[存储管理器] 远程后端导入失败: {e}") |
| 144 | print("[存储管理器] 请确保已安装 boto3: pip install boto3") |
| 145 | return None |
| 146 | except Exception as e: |
| 147 | print(f"[存储管理器] 远程后端初始化失败: {e}") |
| 148 | return None |
| 149 | |
| 150 | def get_backend(self) -> StorageBackend: |
| 151 | """获取存储后端实例""" |
no test coverage detected