MCPcopy
hub / github.com/celery/celery / test_chain_error_handler_with_eta

Method test_chain_error_handler_with_eta

t/integration/test_canvas.py:348–364  ·  view source on GitHub ↗
(self, manager)

Source from the content-addressed store, hash-verified

346 assert result == 3
347
348 def test_chain_error_handler_with_eta(self, manager):
349 try:
350 manager.app.backend.ensure_chords_allowed()
351 except NotImplementedError as e:
352 raise pytest.skip(e.args[0])
353
354 eta = datetime.now(timezone.utc) + timedelta(seconds=10)
355 c = chain(
356 group(
357 add.s(1, 2),
358 add.s(3, 4),
359 ),
360 tsum.s()
361 ).on_error(print_unicode.s()).apply_async(eta=eta)
362
363 result = c.get()
364 assert result == 10
365
366 @flaky
367 def test_groupresult_serialization(self, manager):

Callers

nothing calls this directly

Calls 8

chainClass · 0.90
groupClass · 0.90
ensure_chords_allowedMethod · 0.45
nowMethod · 0.45
apply_asyncMethod · 0.45
on_errorMethod · 0.45
sMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected