Execute the tasks in the queue.
(self, number_of_tasks: int | None = None)
| 41 | self.queue.append(task) |
| 42 | |
| 43 | async def execute(self, number_of_tasks: int | None = None) -> None: |
| 44 | """Execute the tasks in the queue.""" |
| 45 | if self.running: |
| 46 | _LOGGER.debug("<QueueManager> Execution is already running") |
| 47 | raise HacsExecutionStillInProgress |
| 48 | if len(self.queue) == 0: |
| 49 | _LOGGER.debug("<QueueManager> The queue is empty") |
| 50 | return |
| 51 | |
| 52 | self.running = True |
| 53 | |
| 54 | _LOGGER.debug("<QueueManager> Checking out tasks to execute") |
| 55 | local_queue = [] |
| 56 | |
| 57 | if number_of_tasks: |
| 58 | for task in self.queue[:number_of_tasks]: |
| 59 | local_queue.append(task) |
| 60 | else: |
| 61 | for task in self.queue: |
| 62 | local_queue.append(task) |
| 63 | |
| 64 | _LOGGER.debug("<QueueManager> Starting queue execution for %s tasks", len(local_queue)) |
| 65 | start = time.time() |
| 66 | result = await asyncio.gather(*local_queue, return_exceptions=True) |
| 67 | for entry in result: |
| 68 | if isinstance(entry, Exception): |
| 69 | _LOGGER.error("<QueueManager> %s", entry) |
| 70 | end = time.time() - start |
| 71 | |
| 72 | for task in local_queue: |
| 73 | self.queue.remove(task) |
| 74 | |
| 75 | _LOGGER.debug( |
| 76 | "<QueueManager> Queue execution finished for %s tasks finished in %.2f seconds", |
| 77 | len(local_queue), |
| 78 | end, |
| 79 | ) |
| 80 | if self.has_pending_tasks: |
| 81 | _LOGGER.debug("<QueueManager> %s tasks remaining in the queue", len(self.queue)) |
| 82 | self.running = False |