(self)
| 203 | on_task(msg) |
| 204 | |
| 205 | def test_on_task_InvalidTaskError(self): |
| 206 | x, on_task, msg, strategy = self.task_context(self.add.s(2, 2)) |
| 207 | exc = strategy.side_effect = InvalidTaskError() |
| 208 | on_task(msg) |
| 209 | x.on_invalid_task.assert_called_with(None, msg, exc) |
| 210 | |
| 211 | def test_on_task_DecodeError(self): |
| 212 | x, on_task, msg, strategy = self.task_context(self.add.s(2, 2)) |
nothing calls this directly
no test coverage detected