Remove tasks from task queue. Arguments: queue_entries: An iterable list of task to remove
( # noqa: C901, PLR0912
celery_queue: "LocalCeleryQueue",
queue_entries: Iterable["QueueEntry"],
)
| 11 | |
| 12 | |
| 13 | def remove_tasks( # noqa: C901, PLR0912 |
| 14 | celery_queue: "LocalCeleryQueue", |
| 15 | queue_entries: Iterable["QueueEntry"], |
| 16 | ): |
| 17 | """Remove tasks from task queue. |
| 18 | |
| 19 | Arguments: |
| 20 | queue_entries: An iterable list of task to remove |
| 21 | """ |
| 22 | from celery.result import AsyncResult |
| 23 | |
| 24 | stash_revs: dict[str, ExpStashEntry] = {} |
| 25 | failed_stash_revs: list[ExpStashEntry] = [] |
| 26 | done_entry_set: set[QueueEntry] = set() |
| 27 | stash_rev_all = celery_queue.stash.stash_revs |
| 28 | failed_rev_all: dict[str, ExpStashEntry] = {} |
| 29 | if celery_queue.failed_stash: |
| 30 | failed_rev_all = celery_queue.failed_stash.stash_revs |
| 31 | for entry in queue_entries: |
| 32 | if entry.stash_rev in stash_rev_all: |
| 33 | stash_revs[entry.stash_rev] = stash_rev_all[entry.stash_rev] |
| 34 | else: |
| 35 | done_entry_set.add(entry) |
| 36 | if entry.stash_rev in failed_rev_all: |
| 37 | failed_stash_revs.append(failed_rev_all[entry.stash_rev]) |
| 38 | |
| 39 | try: |
| 40 | for msg, queue_entry in celery_queue._iter_queued(): |
| 41 | if queue_entry.stash_rev in stash_revs and msg.delivery_tag: |
| 42 | celery_queue.celery.reject(msg.delivery_tag) |
| 43 | finally: |
| 44 | celery_queue.stash.remove_revs(list(stash_revs.values())) |
| 45 | |
| 46 | try: |
| 47 | for msg, queue_entry in celery_queue._iter_processed(): |
| 48 | if queue_entry not in done_entry_set: |
| 49 | continue |
| 50 | task_id = msg.headers["id"] |
| 51 | result: AsyncResult = AsyncResult(task_id) |
| 52 | if result is not None: |
| 53 | result.forget() |
| 54 | if msg.delivery_tag: |
| 55 | celery_queue.celery.purge(msg.delivery_tag) |
| 56 | finally: |
| 57 | if celery_queue.failed_stash: |
| 58 | celery_queue.failed_stash.remove_revs(failed_stash_revs) |
| 59 | |
| 60 | |
| 61 | def _get_names(entries: Iterable[Union["QueueEntry", "QueueDoneResult"]]): |
no test coverage detected