Get file url from minio :param bucket_name: the name of the bucket :param purpose: the purpose of the file :param file_id: the id of the file, e.g. txt_x123456 :param tenant_id: the id of the tenant. Error will be raised if the tenant_id is not valid
(
self,
bucket_name: str,
purpose: str,
file_id: str,
tenant_id: str,
)
| 351 | return False |
| 352 | |
| 353 | def get_file_url( |
| 354 | self, |
| 355 | bucket_name: str, |
| 356 | purpose: str, |
| 357 | file_id: str, |
| 358 | tenant_id: str, |
| 359 | ) -> str: |
| 360 | """ |
| 361 | Get file url from minio |
| 362 | :param bucket_name: the name of the bucket |
| 363 | :param purpose: the purpose of the file |
| 364 | :param file_id: the id of the file, e.g. txt_x123456 |
| 365 | :param tenant_id: the id of the tenant. Error will be raised if the tenant_id is not valid |
| 366 | :return: the url of the file |
| 367 | """ |
| 368 | |
| 369 | ext, _id = _validate_file_id(file_id) |
| 370 | key = _object_key(purpose, _id, ext, tenant_id) |
| 371 | if self._is_s3: |
| 372 | domain = self._bucket_public_domain or f"{self._endpoint_url}/{bucket_name}" |
| 373 | return f"{domain}/{key}" |
| 374 | |
| 375 | path = key.removesuffix(f".{ext}") |
| 376 | file_path = f"{self._volume}/{path}" |
| 377 | files = os.listdir(file_path) |
| 378 | file: str = next(f for f in files if os.path.isfile(os.path.join(file_path, f))) |
| 379 | return f"{self._host_url}/{path}/{file}" |
| 380 | |
| 381 | async def delete_file( |
| 382 | self, |
no test coverage detected