MCPcopy
hub / github.com/celery/celery / test_on_task_InvalidTaskError

Method test_on_task_InvalidTaskError

t/unit/worker/test_loops.py:205–209  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

203 on_task(msg)
204
205 def test_on_task_InvalidTaskError(self):
206 x, on_task, msg, strategy = self.task_context(self.add.s(2, 2))
207 exc = strategy.side_effect = InvalidTaskError()
208 on_task(msg)
209 x.on_invalid_task.assert_called_with(None, msg, exc)
210
211 def test_on_task_DecodeError(self):
212 x, on_task, msg, strategy = self.task_context(self.add.s(2, 2))

Callers

nothing calls this directly

Calls 4

task_contextMethod · 0.95
InvalidTaskErrorClass · 0.90
on_taskFunction · 0.85
sMethod · 0.45

Tested by

no test coverage detected