(
self, timeout: Optional[float] = None
)
| 266 | logging.debug("[ComfyUI-Manager] Started new batch: %s", self.batch_id) |
| 267 | |
| 268 | def get( |
| 269 | self, timeout: Optional[float] = None |
| 270 | ) -> tuple[Optional[QueueTaskItem], int]: |
| 271 | with self.not_empty: |
| 272 | while len(self.pending_tasks) == 0: |
| 273 | self.not_empty.wait(timeout=timeout) |
| 274 | if timeout is not None and len(self.pending_tasks) == 0: |
| 275 | logging.debug("[ComfyUI-Manager] Task queue get timed out") |
| 276 | return None |
| 277 | # Pop tuple and extract the item |
| 278 | priority, counter, item = heapq.heappop(self.pending_tasks) |
| 279 | task_index = self.task_counter |
| 280 | self.running_tasks[task_index] = copy.deepcopy(item) |
| 281 | self.task_counter += 1 |
| 282 | logging.debug( |
| 283 | "[ComfyUI-Manager] Task retrieved from queue: kind=%s, ui_id=%s, task_index=%d, running_count=%d, pending_count=%d", |
| 284 | item.kind, |
| 285 | item.ui_id, |
| 286 | task_index, |
| 287 | len(self.running_tasks), |
| 288 | len(self.pending_tasks), |
| 289 | ) |
| 290 | TaskQueue.send_queue_state_update( |
| 291 | ManagerMessageName.cm_task_started.value, |
| 292 | MessageTaskStarted( |
| 293 | ui_id=item.ui_id, |
| 294 | kind=item.kind, |
| 295 | timestamp=get_now(), |
| 296 | state=self.get_current_state(), |
| 297 | ), |
| 298 | client_id=item.client_id, # Send task started only to the client that requested it |
| 299 | ) |
| 300 | return item, task_index |
| 301 | |
| 302 | async def task_done( |
| 303 | self, |
no test coverage detected