Background thread that forms and processes batches
()
| 134 | queue_key: The key identifying the queue/model type |
| 135 | """ |
| 136 | def batch_processor(): |
| 137 | """Background thread that forms and processes batches""" |
| 138 | if self.enable_logging: |
| 139 | logger.debug(f"Batch processor started for {queue_key}") |
| 140 | |
| 141 | while not self._shutdown: |
| 142 | try: |
| 143 | batch = [] |
| 144 | queue_obj = self.queues[queue_key] |
| 145 | deadline = time.time() + self.max_wait_seconds |
| 146 | |
| 147 | # Collect requests until batch is full or timeout |
| 148 | while len(batch) < self.max_batch_size and time.time() < deadline: |
| 149 | timeout = max(0.001, deadline - time.time()) # Minimum 1ms timeout |
| 150 | try: |
| 151 | request = queue_obj.get(timeout=timeout) |
| 152 | batch.append(request) |
| 153 | |
| 154 | if self.enable_logging and len(batch) == 1: |
| 155 | logger.debug(f"Started batch formation for {queue_key}") |
| 156 | |
| 157 | except queue.Empty: |
| 158 | break |
| 159 | |
| 160 | if batch: |
| 161 | if self.enable_logging: |
| 162 | wait_time = time.time() - batch[0].timestamp |
| 163 | logger.info(f"Processing batch of {len(batch)} requests for {queue_key} " |
| 164 | f"(waited {wait_time*1000:.1f}ms)") |
| 165 | |
| 166 | # Update stats |
| 167 | self.stats['total_batches'] += 1 |
| 168 | self.stats['total_requests'] += len(batch) |
| 169 | self.stats['avg_batch_size'] = self.stats['total_requests'] / self.stats['total_batches'] |
| 170 | self.stats['total_wait_time'] += sum(time.time() - req.timestamp for req in batch) |
| 171 | |
| 172 | self._process_batch(batch) |
| 173 | |
| 174 | except Exception as e: |
| 175 | logger.error(f"Error in batch processor for {queue_key}: {e}") |
| 176 | # Continue processing other batches |
| 177 | |
| 178 | if self.enable_logging: |
| 179 | logger.debug(f"Batch processor stopped for {queue_key}") |
| 180 | |
| 181 | thread = threading.Thread(target=batch_processor, daemon=True) |
| 182 | thread.start() |
nothing calls this directly
no test coverage detected