MCPcopy Index your code
hub / github.com/coleifer/huey / test_queue_methods

Method test_queue_methods

huey/tests/test_storage.py:65–83  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

63 self.s.flush_all()
64
65 def test_queue_methods(self):
66 for i in range(3):
67 self.s.enqueue(b'item-%d' % i)
68
69 # Remove two items (this API is not used, but we'll test it anyways).
70 self.assertEqual(self.s.dequeue(), b'item-0')
71 self.assertEqual(self.s.queue_size(), 2)
72 self.assertEqual(self.s.enqueued_items(), [b'item-1', b'item-2'])
73 self.assertEqual(self.s.dequeue(), b'item-1')
74 self.assertEqual(self.s.dequeue(), b'item-2')
75 self.assertTrue(self.s.dequeue() is None)
76
77 self.assertEqual(self.s.queue_size(), 0)
78
79 # Test flushing the queue.
80 self.s.enqueue(b'item-3')
81 self.assertEqual(self.s.queue_size(), 1)
82 self.s.flush_queue()
83 self.assertEqual(self.s.queue_size(), 0)
84
85 def test_schedule_methods(self):
86 timestamp = datetime.datetime(2000, 1, 2, 3, 4, 5)

Callers

nothing calls this directly

Calls 5

enqueueMethod · 0.45
dequeueMethod · 0.45
queue_sizeMethod · 0.45
enqueued_itemsMethod · 0.45
flush_queueMethod · 0.45

Tested by

no test coverage detected