Get relationships for a specific object.
(
self, request: RegistryServer_pb2.GetObjectRelationshipsRequest, context
)
| 1185 | ) |
| 1186 | |
| 1187 | def GetObjectRelationships( |
| 1188 | self, request: RegistryServer_pb2.GetObjectRelationshipsRequest, context |
| 1189 | ): |
| 1190 | """Get relationships for a specific object.""" |
| 1191 | try: |
| 1192 | project = self.proxied_registry.get_project( |
| 1193 | name=request.project, allow_cache=True |
| 1194 | ) |
| 1195 | assert_permissions(resource=project, actions=[AuthzedAction.DESCRIBE]) |
| 1196 | except FeastObjectNotFoundException: |
| 1197 | pass |
| 1198 | relationships = self.proxied_registry.get_object_relationships( |
| 1199 | project=request.project, |
| 1200 | object_type=request.object_type, |
| 1201 | object_name=request.object_name, |
| 1202 | include_indirect=request.include_indirect, |
| 1203 | allow_cache=request.allow_cache, |
| 1204 | ) |
| 1205 | |
| 1206 | paginated_relationships, pagination_metadata = apply_pagination_and_sorting( |
| 1207 | relationships, |
| 1208 | pagination=request.pagination, |
| 1209 | sorting=request.sorting, |
| 1210 | ) |
| 1211 | |
| 1212 | return RegistryServer_pb2.GetObjectRelationshipsResponse( |
| 1213 | relationships=[rel.to_proto() for rel in paginated_relationships], |
| 1214 | pagination=pagination_metadata, |
| 1215 | ) |
| 1216 | |
| 1217 | def Commit(self, request, context): |
| 1218 | for project in self.proxied_registry.list_projects(allow_cache=True): |
nothing calls this directly
no test coverage detected