Threaded cached implementation of dask.get Parameters ---------- dsk: dict A dask dictionary specifying a workflow keys: key or list of keys Keys corresponding to desired data num_workers: integer of thread count The number of threads to use in the Threa
(
dsk: Mapping,
keys: Sequence[Key] | Key,
cache=None,
num_workers=None,
pool=None,
**kwargs,
)
| 60 | |
| 61 | |
| 62 | def get( |
| 63 | dsk: Mapping, |
| 64 | keys: Sequence[Key] | Key, |
| 65 | cache=None, |
| 66 | num_workers=None, |
| 67 | pool=None, |
| 68 | **kwargs, |
| 69 | ): |
| 70 | """Threaded cached implementation of dask.get |
| 71 | |
| 72 | Parameters |
| 73 | ---------- |
| 74 | |
| 75 | dsk: dict |
| 76 | A dask dictionary specifying a workflow |
| 77 | keys: key or list of keys |
| 78 | Keys corresponding to desired data |
| 79 | num_workers: integer of thread count |
| 80 | The number of threads to use in the ThreadPool that will actually execute tasks |
| 81 | cache: dict-like (optional) |
| 82 | Temporary storage of results |
| 83 | |
| 84 | Examples |
| 85 | -------- |
| 86 | >>> inc = lambda x: x + 1 |
| 87 | >>> add = lambda x, y: x + y |
| 88 | >>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')} |
| 89 | >>> get(dsk, 'w') |
| 90 | 4 |
| 91 | >>> get(dsk, ['w', 'y']) |
| 92 | (4, 2) |
| 93 | """ |
| 94 | global default_pool |
| 95 | pool = pool or config.get("pool", None) |
| 96 | num_workers = num_workers or config.get("num_workers", None) |
| 97 | thread = current_thread() |
| 98 | |
| 99 | with pools_lock: |
| 100 | if pool is None: |
| 101 | if num_workers is None and thread is main_thread: |
| 102 | if default_pool is None: |
| 103 | default_pool = ContextAwareThreadPoolExecutor(CPU_COUNT) |
| 104 | atexit.register(default_pool.shutdown) |
| 105 | pool = default_pool |
| 106 | elif thread in pools and num_workers in pools[thread]: |
| 107 | pool = pools[thread][num_workers] |
| 108 | else: |
| 109 | pool = ContextAwareThreadPoolExecutor(num_workers) |
| 110 | atexit.register(pool.shutdown) |
| 111 | pools[thread][num_workers] = pool |
| 112 | elif isinstance(pool, multiprocessing.pool.Pool): |
| 113 | pool = MultiprocessingPoolExecutor(pool) |
| 114 | |
| 115 | results = get_async( |
| 116 | pool.submit, |
| 117 | pool._max_workers, |
| 118 | dsk, |
| 119 | keys, |