(self, project: str, key: str, value: str)
| 82 | ) |
| 83 | |
| 84 | def set_project_metadata(self, project: str, key: str, value: str): |
| 85 | registry_proto = self.get_registry_proto() |
| 86 | found = False |
| 87 | for pm in registry_proto.project_metadata: |
| 88 | if pm.project == project: |
| 89 | try: |
| 90 | meta = json.loads(pm.project_uuid) if pm.project_uuid else {} |
| 91 | except Exception: |
| 92 | meta = {} |
| 93 | if not isinstance(meta, dict): |
| 94 | meta = {} |
| 95 | meta[key] = value |
| 96 | pm.project_uuid = json.dumps(meta) |
| 97 | found = True |
| 98 | break |
| 99 | if not found: |
| 100 | from feast.project_metadata import ProjectMetadata |
| 101 | |
| 102 | pm = ProjectMetadata(project_name=project) |
| 103 | pm.project_uuid = json.dumps({key: value}) |
| 104 | registry_proto.project_metadata.append(pm.to_proto()) |
| 105 | self.update_registry_proto(registry_proto) |
| 106 | |
| 107 | def get_project_metadata(self, project: str, key: str) -> Optional[str]: |
| 108 | registry_proto = self.get_registry_proto() |
nothing calls this directly
no test coverage detected