MCPcopy Index your code
hub / github.com/algorithmicsuperintelligence/optillm / batch_processor

Method batch_processor

optillm/batching.py:136–179  ·  view source on GitHub ↗

Background thread that forms and processes batches

()

Source from the content-addressed store, hash-verified

134 queue_key: The key identifying the queue/model type
135 """
136 def batch_processor():
137 """Background thread that forms and processes batches"""
138 if self.enable_logging:
139 logger.debug(f"Batch processor started for {queue_key}")
140
141 while not self._shutdown:
142 try:
143 batch = []
144 queue_obj = self.queues[queue_key]
145 deadline = time.time() + self.max_wait_seconds
146
147 # Collect requests until batch is full or timeout
148 while len(batch) < self.max_batch_size and time.time() < deadline:
149 timeout = max(0.001, deadline - time.time()) # Minimum 1ms timeout
150 try:
151 request = queue_obj.get(timeout=timeout)
152 batch.append(request)
153
154 if self.enable_logging and len(batch) == 1:
155 logger.debug(f"Started batch formation for {queue_key}")
156
157 except queue.Empty:
158 break
159
160 if batch:
161 if self.enable_logging:
162 wait_time = time.time() - batch[0].timestamp
163 logger.info(f"Processing batch of {len(batch)} requests for {queue_key} "
164 f"(waited {wait_time*1000:.1f}ms)")
165
166 # Update stats
167 self.stats['total_batches'] += 1
168 self.stats['total_requests'] += len(batch)
169 self.stats['avg_batch_size'] = self.stats['total_requests'] / self.stats['total_batches']
170 self.stats['total_wait_time'] += sum(time.time() - req.timestamp for req in batch)
171
172 self._process_batch(batch)
173
174 except Exception as e:
175 logger.error(f"Error in batch processor for {queue_key}: {e}")
176 # Continue processing other batches
177
178 if self.enable_logging:
179 logger.debug(f"Batch processor stopped for {queue_key}")
180
181 thread = threading.Thread(target=batch_processor, daemon=True)
182 thread.start()

Callers

nothing calls this directly

Calls 1

_process_batchMethod · 0.95

Tested by

no test coverage detected