Get an instance of the appropriate lock for a certain situation based on scheduler used.
(collection=None, scheduler=None)
| 1347 | |
| 1348 | |
| 1349 | def get_scheduler_lock(collection=None, scheduler=None): |
| 1350 | """Get an instance of the appropriate lock for a certain situation based on |
| 1351 | scheduler used.""" |
| 1352 | from dask import multiprocessing |
| 1353 | from dask.base import get_scheduler |
| 1354 | |
| 1355 | actual_get = get_scheduler(collections=[collection], scheduler=scheduler) |
| 1356 | |
| 1357 | if actual_get == multiprocessing.get: |
| 1358 | return multiprocessing.get_context().Manager().Lock() |
| 1359 | else: |
| 1360 | # if this is a distributed client, we need to lock on |
| 1361 | # the level between processes, SerializableLock won't work |
| 1362 | try: |
| 1363 | import distributed.lock |
| 1364 | from distributed.worker import get_client |
| 1365 | |
| 1366 | client = get_client() |
| 1367 | except (ImportError, ValueError): |
| 1368 | pass |
| 1369 | else: |
| 1370 | if actual_get == client.get: |
| 1371 | return distributed.lock.Lock() |
| 1372 | |
| 1373 | return SerializableLock() |
| 1374 | |
| 1375 | |
| 1376 | def ensure_dict(d: Mapping[K, V], *, copy: bool = False) -> dict[K, V]: |
searching dependent graphs…