MCPcopy
hub / github.com/treeverse/dvc / remove_tasks

Function remove_tasks

dvc/repo/experiments/queue/remove.py:13–58  ·  view source on GitHub ↗

Remove tasks from task queue. Arguments: queue_entries: An iterable list of task to remove

(  # noqa: C901, PLR0912
    celery_queue: "LocalCeleryQueue",
    queue_entries: Iterable["QueueEntry"],
)

Source from the content-addressed store, hash-verified

11
12
13def remove_tasks( # noqa: C901, PLR0912
14 celery_queue: "LocalCeleryQueue",
15 queue_entries: Iterable["QueueEntry"],
16):
17 """Remove tasks from task queue.
18
19 Arguments:
20 queue_entries: An iterable list of task to remove
21 """
22 from celery.result import AsyncResult
23
24 stash_revs: dict[str, ExpStashEntry] = {}
25 failed_stash_revs: list[ExpStashEntry] = []
26 done_entry_set: set[QueueEntry] = set()
27 stash_rev_all = celery_queue.stash.stash_revs
28 failed_rev_all: dict[str, ExpStashEntry] = {}
29 if celery_queue.failed_stash:
30 failed_rev_all = celery_queue.failed_stash.stash_revs
31 for entry in queue_entries:
32 if entry.stash_rev in stash_rev_all:
33 stash_revs[entry.stash_rev] = stash_rev_all[entry.stash_rev]
34 else:
35 done_entry_set.add(entry)
36 if entry.stash_rev in failed_rev_all:
37 failed_stash_revs.append(failed_rev_all[entry.stash_rev])
38
39 try:
40 for msg, queue_entry in celery_queue._iter_queued():
41 if queue_entry.stash_rev in stash_revs and msg.delivery_tag:
42 celery_queue.celery.reject(msg.delivery_tag)
43 finally:
44 celery_queue.stash.remove_revs(list(stash_revs.values()))
45
46 try:
47 for msg, queue_entry in celery_queue._iter_processed():
48 if queue_entry not in done_entry_set:
49 continue
50 task_id = msg.headers["id"]
51 result: AsyncResult = AsyncResult(task_id)
52 if result is not None:
53 result.forget()
54 if msg.delivery_tag:
55 celery_queue.celery.purge(msg.delivery_tag)
56 finally:
57 if celery_queue.failed_stash:
58 celery_queue.failed_stash.remove_revs(failed_stash_revs)
59
60
61def _get_names(entries: Iterable[Union["QueueEntry", "QueueDoneResult"]]):

Callers 3

removeFunction · 0.85
celery_clearFunction · 0.85
celery_removeFunction · 0.85

Calls 5

appendMethod · 0.80
_iter_queuedMethod · 0.80
remove_revsMethod · 0.80
_iter_processedMethod · 0.80
addMethod · 0.45

Tested by

no test coverage detected