MCPcopy
hub / github.com/treeverse/dvc / ThreadPoolExecutor

Class ThreadPoolExecutor

dvc/utils/threadpool.py:9–41  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

7
8
9class ThreadPoolExecutor(futures.ThreadPoolExecutor):
10 def __init__(
11 self,
12 max_workers: Optional[int] = None,
13 cancel_on_error: bool = False,
14 **kwargs,
15 ):
16 super().__init__(max_workers=max_workers, **kwargs)
17 self._cancel_on_error = cancel_on_error
18
19 def imap_unordered(
20 self, fn: Callable[..., _T], *iterables: Iterable[Any]
21 ) -> Iterator[_T]:
22 """Lazier version of map that does not preserve ordering of results.
23
24 It does not create all the futures at once to reduce memory usage.
25 """
26
27 def create_taskset(n: int) -> set[futures.Future]:
28 return {self.submit(fn, *args) for args in islice(it, n)}
29
30 it = zip(*iterables)
31 tasks = create_taskset(self._max_workers * 5)
32 while tasks:
33 done, tasks = futures.wait(tasks, return_when=futures.FIRST_COMPLETED)
34 for fut in done:
35 yield fut.result()
36 tasks.update(create_taskset(len(done)))
37
38 def __exit__(self, exc_type, exc_val, exc_tb):
39 cancel_futures = self._cancel_on_error and exc_val is not None
40 self.shutdown(wait=True, cancel_futures=cancel_futures)
41 return False

Callers 4

_resolve_data_sourcesFunction · 0.90
_getMethod · 0.90
test_cancel_futuresFunction · 0.90

Calls

no outgoing calls

Tested by 2

test_cancel_futuresFunction · 0.72