(
self, request: RegistryServer_pb2.ListFeatureServicesRequest, context
)
| 747 | ).to_proto() |
| 748 | |
| 749 | def ListFeatureServices( |
| 750 | self, request: RegistryServer_pb2.ListFeatureServicesRequest, context |
| 751 | ): |
| 752 | # Get all feature services first |
| 753 | all_feature_services = self.proxied_registry.list_feature_services( |
| 754 | project=request.project, |
| 755 | allow_cache=request.allow_cache, |
| 756 | tags=dict(request.tags), |
| 757 | ) |
| 758 | |
| 759 | # Filter by feature view if specified |
| 760 | if request.feature_view: |
| 761 | filtered_feature_services = [] |
| 762 | for feature_service in all_feature_services: |
| 763 | # Check if the feature service uses the specified feature view |
| 764 | feature_view_match = False |
| 765 | if hasattr(feature_service, "feature_view_projections"): |
| 766 | for ( |
| 767 | feature_view_projection |
| 768 | ) in feature_service.feature_view_projections: |
| 769 | if feature_view_projection.name == request.feature_view: |
| 770 | feature_view_match = True |
| 771 | break |
| 772 | if feature_view_match: |
| 773 | filtered_feature_services.append(feature_service) |
| 774 | all_feature_services = filtered_feature_services |
| 775 | |
| 776 | paginated_feature_services, pagination_metadata = apply_pagination_and_sorting( |
| 777 | permitted_resources( |
| 778 | resources=cast(list[FeastObject], all_feature_services), |
| 779 | actions=AuthzedAction.DESCRIBE, |
| 780 | ), |
| 781 | pagination=request.pagination, |
| 782 | sorting=request.sorting, |
| 783 | ) |
| 784 | |
| 785 | return RegistryServer_pb2.ListFeatureServicesResponse( |
| 786 | feature_services=[ |
| 787 | feature_service.to_proto() |
| 788 | for feature_service in paginated_feature_services |
| 789 | ], |
| 790 | pagination=pagination_metadata, |
| 791 | ) |
| 792 | |
| 793 | def DeleteFeatureService( |
| 794 | self, request: RegistryServer_pb2.DeleteFeatureServiceRequest, context |
nothing calls this directly
no test coverage detected