A Queue-like container for IPC with optional message batched.
| 447 | |
| 448 | |
| 449 | class FusedIpcQueue: |
| 450 | ''' A Queue-like container for IPC with optional message batched. ''' |
| 451 | |
| 452 | def __init__(self, |
| 453 | address: Optional[tuple[str, Optional[bytes]]] = None, |
| 454 | *, |
| 455 | is_server: bool, |
| 456 | fuse_message=False, |
| 457 | fuse_size=100000, |
| 458 | error_queue=None, |
| 459 | queue_cls=ZeroMqQueue, |
| 460 | **kwargs): |
| 461 | |
| 462 | self.queue = queue_cls(address=address, is_server=is_server, **kwargs) |
| 463 | self.fuse_message = fuse_message |
| 464 | self.error_queue = error_queue |
| 465 | self.fuse_size = fuse_size |
| 466 | self._message_counter = 0 |
| 467 | self._obj_counter = 0 |
| 468 | self._send_thread = None |
| 469 | self.sending_queue = Queue() if fuse_message else None |
| 470 | |
| 471 | def setup_sender(self): |
| 472 | if not self.fuse_message or self._send_thread is not None: |
| 473 | return |
| 474 | |
| 475 | def send_task(): |
| 476 | while True: |
| 477 | qsize = self.sending_queue.qsize() |
| 478 | if qsize > 0: |
| 479 | qsize = min(self.fuse_size, qsize) |
| 480 | self._obj_counter += qsize |
| 481 | message = [ |
| 482 | self.sending_queue.get_nowait() for _ in range(qsize) |
| 483 | ] |
| 484 | self.queue.put(message) |
| 485 | self._message_counter += 1 |
| 486 | else: |
| 487 | time.sleep(0.001) |
| 488 | |
| 489 | self._send_thread = ManagedThread(send_task, |
| 490 | name="fused_send_thread", |
| 491 | error_queue=self.error_queue) |
| 492 | self._send_thread.start() |
| 493 | |
| 494 | def put(self, obj: Any): |
| 495 | self.setup_sender() |
| 496 | if self.fuse_message: |
| 497 | self.sending_queue.put_nowait(obj) |
| 498 | else: |
| 499 | batch = obj if isinstance(obj, list) else [obj] |
| 500 | self.queue.put(batch) |
| 501 | |
| 502 | def get(self) -> Any: |
| 503 | return self.queue.get() |
| 504 | |
| 505 | @property |
| 506 | def address(self) -> tuple[str, Optional[bytes]]: |
no outgoing calls