MCPcopy
hub / github.com/mher/flower / test_task_timeout

Method test_task_timeout

tests/unit/api/test_control.py:98–110  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

96 arguments={'queue': 'foo'})
97
98 def test_task_timeout(self):
99 celery = self._app.capp
100 celery.control.time_limit = MagicMock(
101 return_value=[{'foo': {'ok': ''}}])
102
103 r = self.post(
104 '/api/task/timeout/celery.map',
105 body={'workername': 'foo', 'hard': 3.1, 'soft': 1.2}
106 )
107 self.assertEqual(200, r.code)
108 celery.control.time_limit.assert_called_once_with(
109 'celery.map', hard=3.1, soft=1.2, destination=['foo'],
110 reply=True)
111
112 def test_task_ratelimit(self):
113 celery = self._app.capp

Callers

nothing calls this directly

Calls 1

postMethod · 0.45

Tested by

no test coverage detected