MCPcopy
hub / github.com/dask/dask / get

Function get

dask/multiprocessing.py:147–252  ·  view source on GitHub ↗

Multiprocessed get function appropriate for Bags Parameters ---------- dsk : dict dask graph keys : object or list Desired results from graph num_workers : int Number of worker processes (defaults to number of cores) func_dumps : function Func

(
    dsk: Mapping,
    keys: Sequence[Key] | Key,
    num_workers=None,
    func_loads=None,
    func_dumps=None,
    optimize_graph=True,
    pool=None,
    initializer=None,
    chunksize=None,
    **kwargs,
)

Source from the content-addressed store, hash-verified

145
146
147def get(
148 dsk: Mapping,
149 keys: Sequence[Key] | Key,
150 num_workers=None,
151 func_loads=None,
152 func_dumps=None,
153 optimize_graph=True,
154 pool=None,
155 initializer=None,
156 chunksize=None,
157 **kwargs,
158):
159 """Multiprocessed get function appropriate for Bags
160
161 Parameters
162 ----------
163 dsk : dict
164 dask graph
165 keys : object or list
166 Desired results from graph
167 num_workers : int
168 Number of worker processes (defaults to number of cores)
169 func_dumps : function
170 Function to use for function serialization (defaults to cloudpickle.dumps)
171 func_loads : function
172 Function to use for function deserialization (defaults to cloudpickle.loads)
173 optimize_graph : bool
174 If True [default], `fuse` is applied to the graph before computation.
175 pool : Executor or Pool
176 Some sort of `Executor` or `Pool` to use
177 initializer: function
178 Ignored if ``pool`` has been set.
179 Function to initialize a worker process before running any tasks in it.
180 chunksize: int, optional
181 Size of chunks to use when dispatching work.
182 Defaults to 6 as some batching is helpful.
183 If -1, will be computed to evenly divide ready work across workers.
184 """
185 chunksize = chunksize or config.get("chunksize", 6)
186 pool = pool or config.get("pool", None)
187 initializer = initializer or config.get("multiprocessing.initializer", None)
188 num_workers = num_workers or config.get("num_workers", None) or CPU_COUNT
189 if pool is None:
190 # In order to get consistent hashing in subprocesses, we need to set a
191 # consistent seed for the Python hash algorithm. Unfortunately, there
192 # is no way to specify environment variables only for the Pool
193 # processes, so we have to rely on environment variables being
194 # inherited.
195 if os.environ.get("PYTHONHASHSEED") in (None, "0"):
196 # This number is arbitrary; it was chosen to commemorate
197 # https://github.com/dask/dask/issues/6640.
198 os.environ["PYTHONHASHSEED"] = "6640"
199 context = get_context()
200 initializer = partial(initialize_worker_process, user_initializer=initializer)
201 pool = ProcessPoolExecutor(
202 num_workers, mp_context=context, initializer=initializer
203 )
204 cleanup = True

Calls 10

ensure_dictFunction · 0.90
cullFunction · 0.90
fuseFunction · 0.90
get_asyncFunction · 0.90
get_contextFunction · 0.85
warnFunction · 0.85
shutdownMethod · 0.80
getMethod · 0.45
__dask_graph__Method · 0.45