(
request: Request,
collection_id: str,
record_id: str,
data: BaseListRequest = Depends(),
path_params: Dict = Depends(path_params_required),
auth_info: Dict = Depends(auth_info_required),
)
| 45 | response_model=BaseListResponse, |
| 46 | ) |
| 47 | async def api_list_record_chunks( |
| 48 | request: Request, |
| 49 | collection_id: str, |
| 50 | record_id: str, |
| 51 | data: BaseListRequest = Depends(), |
| 52 | path_params: Dict = Depends(path_params_required), |
| 53 | auth_info: Dict = Depends(auth_info_required), |
| 54 | ): |
| 55 | check_path_params( |
| 56 | model_operator=chunk_ops, |
| 57 | object_id_required=False, |
| 58 | path_params=path_params, |
| 59 | ) |
| 60 | |
| 61 | data_prefix_filter = getattr(data, "prefix_filter", {}) |
| 62 | path_params.pop("record_id") |
| 63 | prefix_filter_dict, equal_filter_dict = await validate_list_filter(chunk_ops, path_params, data_prefix_filter) |
| 64 | equal_filter_dict.update({"record_id": record_id}) |
| 65 | chunks, has_more = await chunk_ops.list( |
| 66 | collection_id=collection_id, |
| 67 | limit=data.limit, |
| 68 | order=data.order, |
| 69 | after_id=data.after, |
| 70 | before_id=data.before, |
| 71 | prefix_filters=prefix_filter_dict, |
| 72 | equal_filters=equal_filter_dict, |
| 73 | ) |
| 74 | |
| 75 | return BaseListResponse( |
| 76 | data=[chunk.to_response_dict() for chunk in chunks], |
| 77 | fetched_count=len(chunks), |
| 78 | has_more=has_more, |
| 79 | ) |
nothing calls this directly
no test coverage detected