Add a request to be batched Args: request_data: The request data dictionary Returns: The response from batch processing Raises: BatchingError: If request cannot be processed
(self, request_data: Dict[str, Any])
| 228 | self._processor_func = processor_func |
| 229 | |
| 230 | def add_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]: |
| 231 | """ |
| 232 | Add a request to be batched |
| 233 | |
| 234 | Args: |
| 235 | request_data: The request data dictionary |
| 236 | |
| 237 | Returns: |
| 238 | The response from batch processing |
| 239 | |
| 240 | Raises: |
| 241 | BatchingError: If request cannot be processed |
| 242 | """ |
| 243 | try: |
| 244 | # Generate key for request grouping |
| 245 | queue_key = self._get_request_key(request_data) |
| 246 | |
| 247 | # Create queue and processor if needed |
| 248 | if queue_key not in self.queues: |
| 249 | self.queues[queue_key] = queue.Queue() |
| 250 | self._create_batch_processor(queue_key) |
| 251 | |
| 252 | # Create batch request |
| 253 | future = Future() |
| 254 | batch_request = BatchRequest( |
| 255 | request_data=request_data, |
| 256 | future=future, |
| 257 | timestamp=time.time(), |
| 258 | model=request_data.get('model', 'default'), |
| 259 | approach=request_data.get('optillm_approach') |
| 260 | ) |
| 261 | |
| 262 | # Add to appropriate queue |
| 263 | self.queues[queue_key].put(batch_request) |
| 264 | |
| 265 | if self.enable_logging: |
| 266 | logger.debug(f"Added request to batch queue {queue_key}") |
| 267 | |
| 268 | # Wait for result |
| 269 | return future.result() |
| 270 | |
| 271 | except Exception as e: |
| 272 | raise BatchingError(f"Failed to process request: {str(e)}") |
| 273 | |
| 274 | def get_stats(self) -> Dict[str, Any]: |
| 275 | """Get batching statistics""" |