MCPcopy Index your code
hub / github.com/coleifer/huey / test_schedule_s

Method test_schedule_s

huey/tests/test_api.py:169–200  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

167 self.assertEqual(sched[0], task)
168
169 def test_schedule_s(self):
170 @self.huey.task()
171 def add(a, b):
172 return a + b
173
174 task = add.s(1, 2, delay=10)
175 task = task.then(add, 3, delay=20)
176 r1, r2 = self.huey.enqueue(task)
177 self.execute_next()
178
179 sched = self.huey.scheduled()
180 self.assertEqual(len(sched), 1)
181 self.assertEqual(sched[0], task)
182
183 t10 = datetime.datetime.now() + datetime.timedelta(seconds=10)
184 self.assertFalse(self.huey.ready_to_run(task))
185 self.assertTrue(self.huey.ready_to_run(task, t10))
186
187 t20 = datetime.datetime.now() + datetime.timedelta(seconds=20)
188 oc = task.on_complete
189 self.assertFalse(self.huey.ready_to_run(oc, t10))
190 self.assertTrue(self.huey.ready_to_run(oc, t20))
191
192 r = r1.reschedule()
193 self.assertEqual(len(self.huey), 1)
194 self.assertEqual(self.execute_next(), 3)
195
196 self.assertEqual(len(self.huey), 1)
197 task = self.huey.dequeue()
198 self.assertFalse(self.huey.ready_to_run(task))
199 self.assertTrue(self.huey.ready_to_run(task, t20))
200 self.assertEqual(self.huey.execute(task, t20), 6)
201
202 def test_revoke_task(self):
203 state = {}

Callers

nothing calls this directly

Calls 9

sMethod · 0.80
execute_nextMethod · 0.80
scheduledMethod · 0.80
ready_to_runMethod · 0.80
rescheduleMethod · 0.80
thenMethod · 0.45
enqueueMethod · 0.45
dequeueMethod · 0.45
executeMethod · 0.45

Tested by

no test coverage detected