| 96 | arguments={'queue': 'foo'}) |
| 97 | |
| 98 | def test_task_timeout(self): |
| 99 | celery = self._app.capp |
| 100 | celery.control.time_limit = MagicMock( |
| 101 | return_value=[{'foo': {'ok': ''}}]) |
| 102 | |
| 103 | r = self.post( |
| 104 | '/api/task/timeout/celery.map', |
| 105 | body={'workername': 'foo', 'hard': 3.1, 'soft': 1.2} |
| 106 | ) |
| 107 | self.assertEqual(200, r.code) |
| 108 | celery.control.time_limit.assert_called_once_with( |
| 109 | 'celery.map', hard=3.1, soft=1.2, destination=['foo'], |
| 110 | reply=True) |
| 111 | |
| 112 | def test_task_ratelimit(self): |
| 113 | celery = self._app.capp |