Apply sorting and pagination to a list of objects at the gRPC layer. Args: objects: List of objects to paginate and sort pagination: Pagination parameters (page, limit) sorting: Sorting parameters (sort_by, sort_order) Returns: Tuple of (paginated and s
(
objects: List[Any],
pagination: Optional[RegistryServer_pb2.PaginationParams] = None,
sorting: Optional[RegistryServer_pb2.SortingParams] = None,
)
| 45 | |
| 46 | |
| 47 | def apply_pagination_and_sorting( |
| 48 | objects: List[Any], |
| 49 | pagination: Optional[RegistryServer_pb2.PaginationParams] = None, |
| 50 | sorting: Optional[RegistryServer_pb2.SortingParams] = None, |
| 51 | ) -> tuple[List[Any], RegistryServer_pb2.PaginationMetadata]: |
| 52 | """ |
| 53 | Apply sorting and pagination to a list of objects at the gRPC layer. |
| 54 | |
| 55 | Args: |
| 56 | objects: List of objects to paginate and sort |
| 57 | pagination: Pagination parameters (page, limit) |
| 58 | sorting: Sorting parameters (sort_by, sort_order) |
| 59 | |
| 60 | Returns: |
| 61 | Tuple of (paginated and sorted objects, pagination metadata) |
| 62 | """ |
| 63 | if not objects: |
| 64 | empty_metadata = RegistryServer_pb2.PaginationMetadata( |
| 65 | page=0, |
| 66 | limit=0, |
| 67 | total_count=0, |
| 68 | total_pages=0, |
| 69 | has_next=False, |
| 70 | has_previous=False, |
| 71 | ) |
| 72 | return objects, empty_metadata |
| 73 | |
| 74 | if sorting and sorting.sort_by and sorting.sort_by.strip(): |
| 75 | objects = apply_sorting(objects, sorting.sort_by, sorting.sort_order) |
| 76 | |
| 77 | total_count = len(objects) |
| 78 | |
| 79 | # Apply pagination if requested |
| 80 | if pagination and pagination.page > 0 and pagination.limit > 0: |
| 81 | start_idx = (pagination.page - 1) * pagination.limit |
| 82 | end_idx = start_idx + pagination.limit |
| 83 | paginated_objects = objects[start_idx:end_idx] |
| 84 | |
| 85 | total_pages = (total_count + pagination.limit - 1) // pagination.limit |
| 86 | has_next = pagination.page < total_pages |
| 87 | has_previous = pagination.page > 1 |
| 88 | |
| 89 | pagination_metadata = RegistryServer_pb2.PaginationMetadata( |
| 90 | page=pagination.page, |
| 91 | limit=pagination.limit, |
| 92 | total_count=total_count, |
| 93 | total_pages=total_pages, |
| 94 | has_next=has_next, |
| 95 | has_previous=has_previous, |
| 96 | ) |
| 97 | return paginated_objects, pagination_metadata |
| 98 | else: |
| 99 | # No pagination requested - return all objects |
| 100 | pagination_metadata = RegistryServer_pb2.PaginationMetadata( |
| 101 | page=0, |
| 102 | limit=0, |
| 103 | total_count=total_count, |
| 104 | total_pages=1, |
no test coverage detected