MCPcopy Index your code
hub / github.com/coleifer/huey / test_priority

Method test_priority

huey/tests/test_storage.py:158–169  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

156 self.assertTrue(self.s.wait_result(key, timeout=1))
157
158 def test_priority(self):
159 if not self.s.priority:
160 raise unittest.SkipTest('priority support required')
161
162 priorities = (1, None, 5, None, 3, None, 9, None, 7, 0)
163 for i, priority in enumerate(priorities):
164 item = 'i%s-%s' % (i, priority)
165 self.s.enqueue(item.encode('utf8'), priority)
166
167 expected = [b'i6-9', b'i8-7', b'i2-5', b'i4-3', b'i0-1',
168 b'i1-None', b'i3-None', b'i5-None', b'i7-None', b'i9-0']
169 self.assertEqual([self.s.dequeue() for _ in range(10)], expected)
170
171 def test_counter(self):
172 self.assertEqual(self.s.incr('k1'), 1)

Callers

nothing calls this directly

Calls 2

enqueueMethod · 0.45
dequeueMethod · 0.45

Tested by

no test coverage detected