Automatic request batching for OptILLM Collects incoming requests into batches and processes them together for improved throughput. Maintains separate queues per model type to avoid incompatible mixing.
| 37 | pass |
| 38 | |
| 39 | class RequestBatcher: |
| 40 | """ |
| 41 | Automatic request batching for OptILLM |
| 42 | |
| 43 | Collects incoming requests into batches and processes them together |
| 44 | for improved throughput. Maintains separate queues per model type |
| 45 | to avoid incompatible mixing. |
| 46 | """ |
| 47 | |
| 48 | def __init__(self, |
| 49 | max_batch_size: int = 4, |
| 50 | max_wait_ms: int = 50, |
| 51 | enable_logging: bool = True): |
| 52 | """ |
| 53 | Initialize the request batcher |
| 54 | |
| 55 | Args: |
| 56 | max_batch_size: Maximum number of requests per batch |
| 57 | max_wait_ms: Maximum time to wait for batch formation (milliseconds) |
| 58 | enable_logging: Whether to log batching operations |
| 59 | """ |
| 60 | self.max_batch_size = max_batch_size |
| 61 | self.max_wait_seconds = max_wait_ms / 1000.0 |
| 62 | self.enable_logging = enable_logging |
| 63 | |
| 64 | # Separate queues for different models/approaches |
| 65 | self.queues: Dict[str, queue.Queue] = {} |
| 66 | self.batch_threads: Dict[str, threading.Thread] = {} |
| 67 | |
| 68 | # Stats for monitoring |
| 69 | self.stats = { |
| 70 | 'total_requests': 0, |
| 71 | 'total_batches': 0, |
| 72 | 'avg_batch_size': 0.0, |
| 73 | 'total_wait_time': 0.0 |
| 74 | } |
| 75 | |
| 76 | self._shutdown = False |
| 77 | |
| 78 | if self.enable_logging: |
| 79 | logger.info(f"RequestBatcher initialized: max_batch_size={max_batch_size}, " |
| 80 | f"max_wait_ms={max_wait_ms}") |
| 81 | |
| 82 | def _get_request_key(self, request_data: Dict[str, Any]) -> str: |
| 83 | """ |
| 84 | Generate key to group compatible requests |
| 85 | |
| 86 | Args: |
| 87 | request_data: The request data dictionary |
| 88 | |
| 89 | Returns: |
| 90 | String key for grouping compatible requests |
| 91 | """ |
| 92 | model = request_data.get('model', 'default') |
| 93 | approach = request_data.get('optillm_approach', 'none') |
| 94 | |
| 95 | # Stream requests cannot be batched |
| 96 | if request_data.get('stream', False): |
no outgoing calls