MCPcopy
hub / github.com/feast-dev/feast / get_registry_proto

Method get_registry_proto

sdk/python/feast/infra/registry/s3.py:35–65  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

33 )
34
35 def get_registry_proto(self):
36 file_obj = TemporaryFile()
37 registry_proto = RegistryProto()
38 try:
39 from botocore.exceptions import ClientError
40 except ImportError as e:
41 from feast.errors import FeastExtrasDependencyImportError
42
43 raise FeastExtrasDependencyImportError("aws", str(e))
44 try:
45 bucket = self.s3_client.Bucket(self._bucket)
46 self.s3_client.meta.client.head_bucket(Bucket=bucket.name)
47 except ClientError as e:
48 # If a client error is thrown, then check that it was a 404 error.
49 # If it was a 404 error, then the bucket does not exist.
50 error_code = int(e.response["Error"]["Code"])
51 if error_code == 404:
52 raise S3RegistryBucketNotExist(self._bucket)
53 else:
54 raise S3RegistryBucketForbiddenAccess(self._bucket) from e
55
56 try:
57 obj = bucket.Object(self._key)
58 obj.download_fileobj(file_obj)
59 file_obj.seek(0)
60 registry_proto.ParseFromString(file_obj.read())
61 return registry_proto
62 except ClientError as e:
63 raise FileNotFoundError(
64 f"Error while trying to locate Registry at path {self._uri.geturl()}"
65 ) from e
66
67 def update_registry_proto(self, registry_proto: RegistryProto):
68 self._write_registry(registry_proto)

Callers 5

set_project_metadataMethod · 0.95
get_project_metadataMethod · 0.95
__init__Method · 0.45
_get_registry_protoMethod · 0.45

Tested by

no test coverage detected