(
self, request: RegistryServer_pb2.ListDataSourcesRequest, context
)
| 272 | ).to_proto() |
| 273 | |
| 274 | def ListDataSources( |
| 275 | self, request: RegistryServer_pb2.ListDataSourcesRequest, context |
| 276 | ): |
| 277 | paginated_data_sources, pagination_metadata = apply_pagination_and_sorting( |
| 278 | permitted_resources( |
| 279 | resources=cast( |
| 280 | list[FeastObject], |
| 281 | self.proxied_registry.list_data_sources( |
| 282 | project=request.project, |
| 283 | allow_cache=request.allow_cache, |
| 284 | tags=dict(request.tags), |
| 285 | ), |
| 286 | ), |
| 287 | actions=AuthzedAction.DESCRIBE, |
| 288 | ), |
| 289 | pagination=request.pagination, |
| 290 | sorting=request.sorting, |
| 291 | ) |
| 292 | |
| 293 | return RegistryServer_pb2.ListDataSourcesResponse( |
| 294 | data_sources=[ |
| 295 | data_source.to_proto() for data_source in paginated_data_sources |
| 296 | ], |
| 297 | pagination=pagination_metadata, |
| 298 | ) |
| 299 | |
| 300 | def DeleteDataSource( |
| 301 | self, request: RegistryServer_pb2.DeleteDataSourceRequest, context |
nothing calls this directly
no test coverage detected