Multiprocessed get function appropriate for Bags Parameters ---------- dsk : dict dask graph keys : object or list Desired results from graph num_workers : int Number of worker processes (defaults to number of cores) func_dumps : function Func
(
dsk: Mapping,
keys: Sequence[Key] | Key,
num_workers=None,
func_loads=None,
func_dumps=None,
optimize_graph=True,
pool=None,
initializer=None,
chunksize=None,
**kwargs,
)
| 145 | |
| 146 | |
| 147 | def get( |
| 148 | dsk: Mapping, |
| 149 | keys: Sequence[Key] | Key, |
| 150 | num_workers=None, |
| 151 | func_loads=None, |
| 152 | func_dumps=None, |
| 153 | optimize_graph=True, |
| 154 | pool=None, |
| 155 | initializer=None, |
| 156 | chunksize=None, |
| 157 | **kwargs, |
| 158 | ): |
| 159 | """Multiprocessed get function appropriate for Bags |
| 160 | |
| 161 | Parameters |
| 162 | ---------- |
| 163 | dsk : dict |
| 164 | dask graph |
| 165 | keys : object or list |
| 166 | Desired results from graph |
| 167 | num_workers : int |
| 168 | Number of worker processes (defaults to number of cores) |
| 169 | func_dumps : function |
| 170 | Function to use for function serialization (defaults to cloudpickle.dumps) |
| 171 | func_loads : function |
| 172 | Function to use for function deserialization (defaults to cloudpickle.loads) |
| 173 | optimize_graph : bool |
| 174 | If True [default], `fuse` is applied to the graph before computation. |
| 175 | pool : Executor or Pool |
| 176 | Some sort of `Executor` or `Pool` to use |
| 177 | initializer: function |
| 178 | Ignored if ``pool`` has been set. |
| 179 | Function to initialize a worker process before running any tasks in it. |
| 180 | chunksize: int, optional |
| 181 | Size of chunks to use when dispatching work. |
| 182 | Defaults to 6 as some batching is helpful. |
| 183 | If -1, will be computed to evenly divide ready work across workers. |
| 184 | """ |
| 185 | chunksize = chunksize or config.get("chunksize", 6) |
| 186 | pool = pool or config.get("pool", None) |
| 187 | initializer = initializer or config.get("multiprocessing.initializer", None) |
| 188 | num_workers = num_workers or config.get("num_workers", None) or CPU_COUNT |
| 189 | if pool is None: |
| 190 | # In order to get consistent hashing in subprocesses, we need to set a |
| 191 | # consistent seed for the Python hash algorithm. Unfortunately, there |
| 192 | # is no way to specify environment variables only for the Pool |
| 193 | # processes, so we have to rely on environment variables being |
| 194 | # inherited. |
| 195 | if os.environ.get("PYTHONHASHSEED") in (None, "0"): |
| 196 | # This number is arbitrary; it was chosen to commemorate |
| 197 | # https://github.com/dask/dask/issues/6640. |
| 198 | os.environ["PYTHONHASHSEED"] = "6640" |
| 199 | context = get_context() |
| 200 | initializer = partial(initialize_worker_process, user_initializer=initializer) |
| 201 | pool = ProcessPoolExecutor( |
| 202 | num_workers, mp_context=context, initializer=initializer |
| 203 | ) |
| 204 | cleanup = True |