MCPcopy Index your code
hub / github.com/coleifer/huey / test_revoke_task_instance_persistent

Method test_revoke_task_instance_persistent

huey/tests/test_api.py:298–330  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

296 self.assertFalse(r1.is_revoked()) # Still revoked.
297
298 def test_revoke_task_instance_persistent(self):
299 state = []
300 @self.huey.task()
301 def task_a(n):
302 state.append(n)
303 return n + 1
304
305 r1 = task_a(1)
306 r2 = task_a(2)
307 r1.revoke(revoke_once=False)
308 r2.revoke()
309 self.assertTrue(r1.is_revoked())
310 self.assertTrue(r2.is_revoked())
311
312 t1 = self.huey.dequeue()
313 self.assertTrue(self.huey.execute(t1) is None)
314 self.assertTrue(r1.is_revoked())
315
316 t2 = self.huey.dequeue()
317 self.assertTrue(self.huey.execute(t2) is None)
318 self.assertFalse(r2.is_revoked()) # No longer revoked.
319 self.assertEqual(state, [])
320
321 self.huey.enqueue(t1)
322 self.huey.enqueue(t2)
323 self.assertTrue(self.execute_next() is None)
324 self.assertEqual(self.execute_next(), 3)
325 self.assertEqual(r2.get(), 3)
326 self.assertEqual(state, [2])
327
328 self.assertTrue(r1.is_revoked())
329 self.assertFalse(r2.is_revoked())
330 self.assertEqual(self.huey.result_count(), 1) # t1's revoke id.
331
332 def test_revoke_by_id(self):
333 state = []

Callers

nothing calls this directly

Calls 8

execute_nextMethod · 0.80
result_countMethod · 0.80
revokeMethod · 0.45
is_revokedMethod · 0.45
dequeueMethod · 0.45
executeMethod · 0.45
enqueueMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected