MCPcopy
hub / github.com/TaskingAI/TaskingAI / get_file_url

Method get_file_url

backend/tkhelper/database/boto3/client.py:353–379  ·  view source on GitHub ↗

Get file url from minio :param bucket_name: the name of the bucket :param purpose: the purpose of the file :param file_id: the id of the file, e.g. txt_x123456 :param tenant_id: the id of the tenant. Error will be raised if the tenant_id is not valid

(
        self,
        bucket_name: str,
        purpose: str,
        file_id: str,
        tenant_id: str,
    )

Source from the content-addressed store, hash-verified

351 return False
352
353 def get_file_url(
354 self,
355 bucket_name: str,
356 purpose: str,
357 file_id: str,
358 tenant_id: str,
359 ) -> str:
360 """
361 Get file url from minio
362 :param bucket_name: the name of the bucket
363 :param purpose: the purpose of the file
364 :param file_id: the id of the file, e.g. txt_x123456
365 :param tenant_id: the id of the tenant. Error will be raised if the tenant_id is not valid
366 :return: the url of the file
367 """
368
369 ext, _id = _validate_file_id(file_id)
370 key = _object_key(purpose, _id, ext, tenant_id)
371 if self._is_s3:
372 domain = self._bucket_public_domain or f"{self._endpoint_url}/{bucket_name}"
373 return f"{domain}/{key}"
374
375 path = key.removesuffix(f".{ext}")
376 file_path = f"{self._volume}/{path}"
377 files = os.listdir(file_path)
378 file: str = next(f for f in files if os.path.isfile(os.path.join(file_path, f)))
379 return f"{self._host_url}/{path}/{file}"
380
381 async def delete_file(
382 self,

Callers 1

download_record_fileFunction · 0.80

Calls 2

_validate_file_idFunction · 0.85
_object_keyFunction · 0.85

Tested by

no test coverage detected