MCPcopy Index your code
hub / github.com/coleifer/huey / schedule

Method schedule

huey/api.py:997–1025  ·  view source on GitHub ↗
(self, args=None, kwargs=None, eta=None, delay=None,
                 priority=None, retries=None, retry_delay=None, expires=None,
                 timeout=None, id=None)

Source from the content-addressed store, hash-verified

995 return self.huey.restore_all(self.task_class)
996
997 def schedule(self, args=None, kwargs=None, eta=None, delay=None,
998 priority=None, retries=None, retry_delay=None, expires=None,
999 timeout=None, id=None):
1000 if eta is None and delay is None:
1001 if isinstance(args, (int, float)):
1002 delay = args
1003 elif isinstance(args, datetime.timedelta):
1004 delay = args.total_seconds()
1005 elif isinstance(args, datetime.datetime):
1006 eta = args
1007 else:
1008 raise ValueError('schedule() missing required eta= or delay=')
1009 args = None
1010
1011 if kwargs is not None and not isinstance(kwargs, dict):
1012 raise ValueError('schedule() kwargs argument must be a dict.')
1013
1014 eta = normalize_time(eta, delay, self.huey.utc)
1015 task = self.task_class(
1016 args or (),
1017 kwargs or {},
1018 id=id,
1019 eta=eta,
1020 retries=retries,
1021 retry_delay=retry_delay,
1022 priority=priority,
1023 expires=expires,
1024 timeout=timeout)
1025 return self.huey.enqueue(task)
1026
1027 def _apply(self, it):
1028 return [self.s(*(i if isinstance(i, tuple) else (i,))) for i in it]

Callers 1

mini.pyFile · 0.45

Calls 2

normalize_timeFunction · 0.90
enqueueMethod · 0.45

Tested by

no test coverage detected