MCPcopy Index your code
hub / github.com/NVIDIA/TensorRT-LLM / FusedIpcQueue

Class FusedIpcQueue

tensorrt_llm/executor/ipc.py:449–527  ·  view source on GitHub ↗

A Queue-like container for IPC with optional message batched.

Source from the content-addressed store, hash-verified

447
448
449class FusedIpcQueue:
450 ''' A Queue-like container for IPC with optional message batched. '''
451
452 def __init__(self,
453 address: Optional[tuple[str, Optional[bytes]]] = None,
454 *,
455 is_server: bool,
456 fuse_message=False,
457 fuse_size=100000,
458 error_queue=None,
459 queue_cls=ZeroMqQueue,
460 **kwargs):
461
462 self.queue = queue_cls(address=address, is_server=is_server, **kwargs)
463 self.fuse_message = fuse_message
464 self.error_queue = error_queue
465 self.fuse_size = fuse_size
466 self._message_counter = 0
467 self._obj_counter = 0
468 self._send_thread = None
469 self.sending_queue = Queue() if fuse_message else None
470
471 def setup_sender(self):
472 if not self.fuse_message or self._send_thread is not None:
473 return
474
475 def send_task():
476 while True:
477 qsize = self.sending_queue.qsize()
478 if qsize > 0:
479 qsize = min(self.fuse_size, qsize)
480 self._obj_counter += qsize
481 message = [
482 self.sending_queue.get_nowait() for _ in range(qsize)
483 ]
484 self.queue.put(message)
485 self._message_counter += 1
486 else:
487 time.sleep(0.001)
488
489 self._send_thread = ManagedThread(send_task,
490 name="fused_send_thread",
491 error_queue=self.error_queue)
492 self._send_thread.start()
493
494 def put(self, obj: Any):
495 self.setup_sender()
496 if self.fuse_message:
497 self.sending_queue.put_nowait(obj)
498 else:
499 batch = obj if isinstance(obj, list) else [obj]
500 self.queue.put(batch)
501
502 def get(self) -> Any:
503 return self.queue.get()
504
505 @property
506 def address(self) -> tuple[str, Optional[bytes]]:

Callers 3

test_FusedIpcQueueFunction · 0.90
_setup_queuesMethod · 0.85
worker_mainFunction · 0.85

Calls

no outgoing calls

Tested by 1

test_FusedIpcQueueFunction · 0.72