Queue a task for the cluster.
(func, *args, **kwargs)
| 18 | |
| 19 | |
| 20 | def async_task(func, *args, **kwargs): |
| 21 | """Queue a task for the cluster.""" |
| 22 | keywords = kwargs.copy() |
| 23 | opt_keys = ( |
| 24 | "hook", |
| 25 | "group", |
| 26 | "save", |
| 27 | "sync", |
| 28 | "cached", |
| 29 | "ack_failure", |
| 30 | "iter_count", |
| 31 | "iter_cached", |
| 32 | "chain", |
| 33 | "broker", |
| 34 | "timeout", |
| 35 | ) |
| 36 | q_options = keywords.pop("q_options", {}) |
| 37 | # get an id |
| 38 | tag = uuid() |
| 39 | # build the task package |
| 40 | task = { |
| 41 | "id": tag[1], |
| 42 | "name": keywords.pop("task_name", None) |
| 43 | or q_options.pop("task_name", None) |
| 44 | or tag[0], |
| 45 | "func": func, |
| 46 | "args": args, |
| 47 | } |
| 48 | # push optionals |
| 49 | for key in opt_keys: |
| 50 | if q_options and key in q_options: |
| 51 | task[key] = q_options[key] |
| 52 | elif key in keywords: |
| 53 | task[key] = keywords.pop(key) |
| 54 | # don't serialize the broker |
| 55 | broker = task.pop("broker", get_broker()) |
| 56 | # overrides |
| 57 | if "cached" not in task and Conf.CACHED: |
| 58 | task["cached"] = Conf.CACHED |
| 59 | if "sync" not in task and Conf.SYNC: |
| 60 | task["sync"] = Conf.SYNC |
| 61 | if "ack_failure" not in task and Conf.ACK_FAILURES: |
| 62 | task["ack_failure"] = Conf.ACK_FAILURES |
| 63 | # finalize |
| 64 | task["kwargs"] = keywords |
| 65 | task["started"] = timezone.now() |
| 66 | # signal it |
| 67 | pre_enqueue.send(sender="django_q", task=task) |
| 68 | # sign it |
| 69 | pack = SignedPackage.dumps(task) |
| 70 | if task.get("sync", False): |
| 71 | return _sync(pack) |
| 72 | # push it |
| 73 | enqueue_id = broker.enqueue(pack) |
| 74 | logger.info(f"Enqueued {enqueue_id}") |
| 75 | logger.debug(f"Pushed {tag}") |
| 76 | return task["id"] |
| 77 |