Process a batch of requests Args: batch: List of batch requests to process
(self, batch: List[BatchRequest])
| 183 | self.batch_threads[queue_key] = thread |
| 184 | |
| 185 | def _process_batch(self, batch: List[BatchRequest]) -> None: |
| 186 | """ |
| 187 | Process a batch of requests |
| 188 | |
| 189 | Args: |
| 190 | batch: List of batch requests to process |
| 191 | """ |
| 192 | try: |
| 193 | # Validate batch compatibility |
| 194 | self._validate_batch_compatibility(batch) |
| 195 | |
| 196 | if not hasattr(self, '_processor_func'): |
| 197 | raise BatchingError("No batch processor function set") |
| 198 | |
| 199 | # Extract request data |
| 200 | request_data_list = [req.request_data for req in batch] |
| 201 | |
| 202 | # Process the batch |
| 203 | responses = self._processor_func(request_data_list) |
| 204 | |
| 205 | # Validate response count |
| 206 | if len(responses) != len(batch): |
| 207 | raise BatchingError(f"Processor returned {len(responses)} responses for {len(batch)} requests") |
| 208 | |
| 209 | # Set results |
| 210 | for req, response in zip(batch, responses): |
| 211 | req.future.set_result(response) |
| 212 | |
| 213 | except Exception as e: |
| 214 | # Fail all requests in batch with the same error |
| 215 | error_msg = f"Batch processing failed: {str(e)}" |
| 216 | logger.error(error_msg) |
| 217 | |
| 218 | for req in batch: |
| 219 | req.future.set_exception(BatchingError(error_msg)) |
| 220 | |
| 221 | def set_processor(self, processor_func): |
| 222 | """ |
no test coverage detected