MCPcopy
hub / github.com/hacs/integration / execute

Method execute

custom_components/hacs/utils/queue_manager.py:43–82  ·  view source on GitHub ↗

Execute the tasks in the queue.

(self, number_of_tasks: int | None = None)

Source from the content-addressed store, hash-verified

41 self.queue.append(task)
42
43 async def execute(self, number_of_tasks: int | None = None) -> None:
44 """Execute the tasks in the queue."""
45 if self.running:
46 _LOGGER.debug("<QueueManager> Execution is already running")
47 raise HacsExecutionStillInProgress
48 if len(self.queue) == 0:
49 _LOGGER.debug("<QueueManager> The queue is empty")
50 return
51
52 self.running = True
53
54 _LOGGER.debug("<QueueManager> Checking out tasks to execute")
55 local_queue = []
56
57 if number_of_tasks:
58 for task in self.queue[:number_of_tasks]:
59 local_queue.append(task)
60 else:
61 for task in self.queue:
62 local_queue.append(task)
63
64 _LOGGER.debug("<QueueManager> Starting queue execution for %s tasks", len(local_queue))
65 start = time.time()
66 result = await asyncio.gather(*local_queue, return_exceptions=True)
67 for entry in result:
68 if isinstance(entry, Exception):
69 _LOGGER.error("<QueueManager> %s", entry)
70 end = time.time() - start
71
72 for task in local_queue:
73 self.queue.remove(task)
74
75 _LOGGER.debug(
76 "<QueueManager> Queue execution finished for %s tasks finished in %.2f seconds",
77 len(local_queue),
78 end,
79 )
80 if self.has_pending_tasks:
81 _LOGGER.debug("<QueueManager> %s tasks remaining in the queue", len(self.queue))
82 self.running = False

Callers 5

download_contentMethod · 0.95
test_queue_managerFunction · 0.95
_handle_queueMethod · 0.80
_handle_queueMethod · 0.80

Calls 1

removeMethod · 0.80

Tested by 1

test_queue_managerFunction · 0.76