(self, registry_proto: RegistryProto)
| 63 | pass |
| 64 | |
| 65 | def _write_registry(self, registry_proto: RegistryProto): |
| 66 | registry_proto.version_id = str(uuid.uuid4()) |
| 67 | registry_proto.last_updated.FromDatetime(_utc_now()) |
| 68 | # we have already checked the bucket exists so no need to do it again |
| 69 | gs_bucket = self.gcs_client.get_bucket(self._bucket) |
| 70 | blob = gs_bucket.blob(self._blob) |
| 71 | file_obj = TemporaryFile() |
| 72 | file_obj.write(registry_proto.SerializeToString()) |
| 73 | file_obj.seek(0) |
| 74 | blob.upload_from_file(file_obj) |
| 75 | |
| 76 | def set_project_metadata(self, project: str, key: str, value: str): |
| 77 | registry_proto = self.get_registry_proto() |
no test coverage detected