Registers a new queue in the management system. Args: queue (queue.Queue): The queue object to register queue_name (str): Base identifier for the queue (default: "unknown") Note: Automatically appends the queue's memory address for uniqueness
(self, queue: queue.Queue, queue_name="unknown")
| 144 | self.rr_queue = defaultdict(queue.Queue) |
| 145 | |
| 146 | def update(self, queue: queue.Queue, queue_name="unknown"): |
| 147 | """ |
| 148 | Registers a new queue in the management system. |
| 149 | |
| 150 | Args: |
| 151 | queue (queue.Queue): The queue object to register |
| 152 | queue_name (str): Base identifier for the queue (default: "unknown") |
| 153 | Note: Automatically appends the queue's memory address for uniqueness |
| 154 | |
| 155 | Raises: |
| 156 | ValueError: If a queue with the generated name already exists |
| 157 | """ |
| 158 | queue_name = f"{queue_name}_{id(queue)}" |
| 159 | if queue_name in self.rr_queue: |
| 160 | raise ValueError(f"Queue name '{queue_name}' already exists.") |
| 161 | self.rr_queue[queue_name] = queue |
| 162 | |
| 163 | def check(self): |
| 164 | """ |
no outgoing calls
no test coverage detected