MCPcopy
hub / github.com/hacs/integration / test_queue_manager

Function test_queue_manager

tests/utils/test_queue_manager.py:13–47  ·  view source on GitHub ↗

Test the queue manager.

(hacs: HacsBase, caplog: pytest.LogCaptureFixture)

Source from the content-addressed store, hash-verified

11
12
13async def test_queue_manager(hacs: HacsBase, caplog: pytest.LogCaptureFixture) -> None:
14 """Test the queue manager."""
15 queue_manager = QueueManager(hass=hacs.hass)
16 assert not queue_manager.running
17 assert not queue_manager.has_pending_tasks
18 assert queue_manager.pending_tasks == 0
19 assert queue_manager.queue == []
20
21 queue_manager.add(dummy_task())
22 assert queue_manager.has_pending_tasks
23 assert queue_manager.pending_tasks == 1
24
25 for _ in range(1, 5):
26 queue_manager.add(dummy_task())
27 assert queue_manager.has_pending_tasks
28 assert queue_manager.pending_tasks == 5
29
30 await queue_manager.execute(1)
31 assert queue_manager.has_pending_tasks
32 assert queue_manager.pending_tasks == 4
33
34 queue_manager.running = True
35
36 with pytest.raises(HacsExecutionStillInProgress):
37 await queue_manager.execute()
38
39 queue_manager.running = False
40
41 await queue_manager.execute()
42 await queue_manager.execute()
43 assert not queue_manager.running
44 assert not queue_manager.has_pending_tasks
45 assert queue_manager.pending_tasks == 0
46 assert queue_manager.queue == []
47 assert "The queue is empty" in caplog.text

Callers

nothing calls this directly

Calls 3

addMethod · 0.95
executeMethod · 0.95
QueueManagerClass · 0.90

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…