MCPcopy Index your code
hub / github.com/coleifer/huey / test_group_error_chaining

Method test_group_error_chaining

huey/tests/test_api.py:1532–1553  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1530 self.assertTrue('TestError' in error_state[0])
1531
1532 def test_group_error_chaining(self):
1533 # group.error() returns self, enabling .then() chaining.
1534 @self.huey.task()
1535 def fetch(n):
1536 return n
1537
1538 @self.huey.task()
1539 def on_err(exc):
1540 pass
1541
1542 @self.huey.task()
1543 def combine(results):
1544 return sum(results)
1545
1546 result = self.huey.enqueue(
1547 group([fetch.s(2), fetch.s(3)])
1548 .error(on_err)
1549 .then(combine))
1550
1551 self.assertEqual([self.execute_next() for _ in range(3)],
1552 [2, 3, 5])
1553 self.assertEqual(result(), 5)
1554
1555
1556class TestChordPrimitive(BaseTestCase):

Callers

nothing calls this directly

Calls 6

groupClass · 0.90
sMethod · 0.80
execute_nextMethod · 0.80
enqueueMethod · 0.45
thenMethod · 0.45
errorMethod · 0.45

Tested by

no test coverage detected