Wait for batch respecting self.max_batch_size and self.timeout_s. Returns a tuple of (batch, computed_batch_size) where batch contains up to self.max_batch_size items. Waits for up to self.timeout_s after receiving the first request that will be in the next batch. After the
(self)
| 252 | return self.batch_size_fn(items) |
| 253 | |
| 254 | async def wait_for_batch(self) -> Tuple[List[_SingleRequest], int]: |
| 255 | """Wait for batch respecting self.max_batch_size and self.timeout_s. |
| 256 | |
| 257 | Returns a tuple of (batch, computed_batch_size) where batch contains |
| 258 | up to self.max_batch_size items. Waits for up to self.timeout_s after |
| 259 | receiving the first request that will be in the next batch. After the |
| 260 | timeout, returns as many items as are ready. |
| 261 | |
| 262 | Always returns a batch with at least one item - will block |
| 263 | indefinitely until an item comes in. |
| 264 | """ |
| 265 | |
| 266 | batch = [] |
| 267 | first_item = await self.queue.get() # Block until first item arrives |
| 268 | |
| 269 | # Cache current max_batch_size and batch_wait_timeout_s for this batch. |
| 270 | max_batch_size = self.max_batch_size |
| 271 | batch_wait_timeout_s = self.batch_wait_timeout_s |
| 272 | |
| 273 | # Check if first item alone exceeds max_batch_size (only with batch_size_fn) |
| 274 | if self.batch_size_fn is not None: |
| 275 | first_item_size = self._compute_batch_size([first_item]) |
| 276 | if first_item_size > max_batch_size: |
| 277 | exc = RuntimeError( |
| 278 | "Size of item is greater than max_batch_size. " |
| 279 | "Please increase the max_batch_size or check the " |
| 280 | "implementation of the batch_size_fn." |
| 281 | ) |
| 282 | # Set exception on the future so the caller receives it |
| 283 | first_item.future.set_exception(exc) |
| 284 | return [], 0 |
| 285 | |
| 286 | batch.append(first_item) |
| 287 | |
| 288 | # Wait self.timeout_s seconds for new queue arrivals. |
| 289 | batch_start_time = time.time() |
| 290 | while True: |
| 291 | # Record queue length metric. |
| 292 | self._batch_queue_length_gauge.set( |
| 293 | self.queue.qsize(), tags={"function_name": self._function_name} |
| 294 | ) |
| 295 | |
| 296 | remaining_batch_time_s = max( |
| 297 | batch_wait_timeout_s - (time.time() - batch_start_time), 0 |
| 298 | ) |
| 299 | try: |
| 300 | # Wait for new arrivals. |
| 301 | await asyncio.wait_for( |
| 302 | self.requests_available_event.wait(), remaining_batch_time_s |
| 303 | ) |
| 304 | except asyncio.TimeoutError: |
| 305 | pass |
| 306 | |
| 307 | # Custom batch size function logic |
| 308 | if self.batch_size_fn is not None: |
| 309 | # Add all new arrivals to the batch. |
| 310 | # Track items we need to put back if they don't fit |
| 311 | deferred_item = None |
no test coverage detected