(self)
| 48 | ExcCounter.counter = 0 |
| 49 | |
| 50 | def test_workflow(self): |
| 51 | @self.huey.task() |
| 52 | def task_a(n): |
| 53 | return n + 1 |
| 54 | |
| 55 | result = task_a(3) |
| 56 | self.assertTrue(isinstance(result, Result)) |
| 57 | self.assertFalse(result.is_ready()) |
| 58 | self.assertEqual(len(self.huey), 1) # One item in queue. |
| 59 | task = self.huey.dequeue() |
| 60 | self.assertEqual(len(self.huey), 0) # No items in queue. |
| 61 | self.assertEqual(self.huey.result_count(), 0) # No results. |
| 62 | |
| 63 | self.assertEqual(result.id, task.id) # Result points to task. |
| 64 | self.assertFalse(result.is_ready()) |
| 65 | self.assertTrue(result.get() is None) |
| 66 | |
| 67 | # Execute task, placing result in result store and returning the value |
| 68 | # produced by the task. |
| 69 | self.assertEqual(self.huey.execute(task), 4) |
| 70 | |
| 71 | # Data is present in result store, we can read it from the result |
| 72 | # instance, and after reading the value is removed. |
| 73 | self.assertEqual(self.huey.result_count(), 1) |
| 74 | self.assertTrue(result.is_ready()) |
| 75 | self.assertEqual(result.get(), 4) |
| 76 | self.assertEqual(self.huey.result_count(), 0) |
| 77 | |
| 78 | def test_result_store(self): |
| 79 | @self.huey.task() |
nothing calls this directly
no test coverage detected