MCPcopy Index your code
hub / github.com/ray-project/ray / wait_for_batch

Method wait_for_batch

python/ray/serve/batching.py:254–363  ·  view source on GitHub ↗

Wait for batch respecting self.max_batch_size and self.timeout_s. Returns a tuple of (batch, computed_batch_size) where batch contains up to self.max_batch_size items. Waits for up to self.timeout_s after receiving the first request that will be in the next batch. After the

(self)

Source from the content-addressed store, hash-verified

252 return self.batch_size_fn(items)
253
254 async def wait_for_batch(self) -> Tuple[List[_SingleRequest], int]:
255 """Wait for batch respecting self.max_batch_size and self.timeout_s.
256
257 Returns a tuple of (batch, computed_batch_size) where batch contains
258 up to self.max_batch_size items. Waits for up to self.timeout_s after
259 receiving the first request that will be in the next batch. After the
260 timeout, returns as many items as are ready.
261
262 Always returns a batch with at least one item - will block
263 indefinitely until an item comes in.
264 """
265
266 batch = []
267 first_item = await self.queue.get() # Block until first item arrives
268
269 # Cache current max_batch_size and batch_wait_timeout_s for this batch.
270 max_batch_size = self.max_batch_size
271 batch_wait_timeout_s = self.batch_wait_timeout_s
272
273 # Check if first item alone exceeds max_batch_size (only with batch_size_fn)
274 if self.batch_size_fn is not None:
275 first_item_size = self._compute_batch_size([first_item])
276 if first_item_size > max_batch_size:
277 exc = RuntimeError(
278 "Size of item is greater than max_batch_size. "
279 "Please increase the max_batch_size or check the "
280 "implementation of the batch_size_fn."
281 )
282 # Set exception on the future so the caller receives it
283 first_item.future.set_exception(exc)
284 return [], 0
285
286 batch.append(first_item)
287
288 # Wait self.timeout_s seconds for new queue arrivals.
289 batch_start_time = time.time()
290 while True:
291 # Record queue length metric.
292 self._batch_queue_length_gauge.set(
293 self.queue.qsize(), tags={"function_name": self._function_name}
294 )
295
296 remaining_batch_time_s = max(
297 batch_wait_timeout_s - (time.time() - batch_start_time), 0
298 )
299 try:
300 # Wait for new arrivals.
301 await asyncio.wait_for(
302 self.requests_available_event.wait(), remaining_batch_time_s
303 )
304 except asyncio.TimeoutError:
305 pass
306
307 # Custom batch size function logic
308 if self.batch_size_fn is not None:
309 # Add all new arrivals to the batch.
310 # Track items we need to put back if they don't fit
311 deferred_item = None

Callers 1

_process_batchesMethod · 0.95

Calls 13

_compute_batch_sizeMethod · 0.95
getMethod · 0.65
setMethod · 0.65
waitMethod · 0.65
appendMethod · 0.45
timeMethod · 0.45
qsizeMethod · 0.45
emptyMethod · 0.45
get_nowaitMethod · 0.45
popMethod · 0.45
put_nowaitMethod · 0.45
clearMethod · 0.45

Tested by

no test coverage detected