MCPcopy Index your code
hub / github.com/coleifer/huey / test_workflow

Method test_workflow

huey/tests/test_api.py:50–76  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

48 ExcCounter.counter = 0
49
50 def test_workflow(self):
51 @self.huey.task()
52 def task_a(n):
53 return n + 1
54
55 result = task_a(3)
56 self.assertTrue(isinstance(result, Result))
57 self.assertFalse(result.is_ready())
58 self.assertEqual(len(self.huey), 1) # One item in queue.
59 task = self.huey.dequeue()
60 self.assertEqual(len(self.huey), 0) # No items in queue.
61 self.assertEqual(self.huey.result_count(), 0) # No results.
62
63 self.assertEqual(result.id, task.id) # Result points to task.
64 self.assertFalse(result.is_ready())
65 self.assertTrue(result.get() is None)
66
67 # Execute task, placing result in result store and returning the value
68 # produced by the task.
69 self.assertEqual(self.huey.execute(task), 4)
70
71 # Data is present in result store, we can read it from the result
72 # instance, and after reading the value is removed.
73 self.assertEqual(self.huey.result_count(), 1)
74 self.assertTrue(result.is_ready())
75 self.assertEqual(result.get(), 4)
76 self.assertEqual(self.huey.result_count(), 0)
77
78 def test_result_store(self):
79 @self.huey.task()

Callers

nothing calls this directly

Calls 5

is_readyMethod · 0.80
result_countMethod · 0.80
dequeueMethod · 0.45
getMethod · 0.45
executeMethod · 0.45

Tested by

no test coverage detected