MCPcopy Index your code
hub / github.com/coleifer/huey / test_group_error_handler

Method test_group_error_handler

huey/tests/test_api.py:1504–1530  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1502 self.assertEqual(r(), ['a', 'b', 'c'])
1503
1504 def test_group_error_handler(self):
1505 error_state = []
1506
1507 @self.huey.task()
1508 def task_a(n):
1509 if n < 0:
1510 raise TestError('invalid')
1511 return n
1512
1513 @self.huey.task()
1514 def on_err(exc):
1515 error_state.append(repr(exc))
1516
1517 g = group([task_a.s(1), task_a.s(-1), task_a.s(3)])
1518 g.error(on_err)
1519 self.huey.enqueue(g)
1520 self.assertEqual(len(self.huey), 3)
1521
1522 self.execute_next() # task_a(1)
1523 self.execute_next() # task_a(-1) fails, on err enqueued.
1524 self.assertEqual(len(self.huey), 2)
1525 self.execute_next() # task_a(3)
1526
1527 self.assertEqual(len(error_state), 0)
1528 self.execute_next() # on_err runs
1529 self.assertEqual(len(error_state), 1)
1530 self.assertTrue('TestError' in error_state[0])
1531
1532 def test_group_error_chaining(self):
1533 # group.error() returns self, enabling .then() chaining.

Callers

nothing calls this directly

Calls 5

groupClass · 0.90
sMethod · 0.80
execute_nextMethod · 0.80
errorMethod · 0.45
enqueueMethod · 0.45

Tested by

no test coverage detected