(self)
| 296 | self.assertFalse(r1.is_revoked()) # Still revoked. |
| 297 | |
| 298 | def test_revoke_task_instance_persistent(self): |
| 299 | state = [] |
| 300 | @self.huey.task() |
| 301 | def task_a(n): |
| 302 | state.append(n) |
| 303 | return n + 1 |
| 304 | |
| 305 | r1 = task_a(1) |
| 306 | r2 = task_a(2) |
| 307 | r1.revoke(revoke_once=False) |
| 308 | r2.revoke() |
| 309 | self.assertTrue(r1.is_revoked()) |
| 310 | self.assertTrue(r2.is_revoked()) |
| 311 | |
| 312 | t1 = self.huey.dequeue() |
| 313 | self.assertTrue(self.huey.execute(t1) is None) |
| 314 | self.assertTrue(r1.is_revoked()) |
| 315 | |
| 316 | t2 = self.huey.dequeue() |
| 317 | self.assertTrue(self.huey.execute(t2) is None) |
| 318 | self.assertFalse(r2.is_revoked()) # No longer revoked. |
| 319 | self.assertEqual(state, []) |
| 320 | |
| 321 | self.huey.enqueue(t1) |
| 322 | self.huey.enqueue(t2) |
| 323 | self.assertTrue(self.execute_next() is None) |
| 324 | self.assertEqual(self.execute_next(), 3) |
| 325 | self.assertEqual(r2.get(), 3) |
| 326 | self.assertEqual(state, [2]) |
| 327 | |
| 328 | self.assertTrue(r1.is_revoked()) |
| 329 | self.assertFalse(r2.is_revoked()) |
| 330 | self.assertEqual(self.huey.result_count(), 1) # t1's revoke id. |
| 331 | |
| 332 | def test_revoke_by_id(self): |
| 333 | state = [] |
nothing calls this directly
no test coverage detected