(self, registry_proto: RegistryProto)
| 71 | self.s3_client.Object(self._bucket, self._key).delete() |
| 72 | |
| 73 | def _write_registry(self, registry_proto: RegistryProto): |
| 74 | registry_proto.version_id = str(uuid.uuid4()) |
| 75 | registry_proto.last_updated.FromDatetime(_utc_now()) |
| 76 | # we have already checked the bucket exists so no need to do it again |
| 77 | file_obj = TemporaryFile() |
| 78 | file_obj.write(registry_proto.SerializeToString()) |
| 79 | file_obj.seek(0) |
| 80 | self.s3_client.Bucket(self._bucket).put_object( |
| 81 | Body=file_obj, Key=self._key, **self._boto_extra_args |
| 82 | ) |
| 83 | |
| 84 | def set_project_metadata(self, project: str, key: str, value: str): |
| 85 | registry_proto = self.get_registry_proto() |
no test coverage detected