Extract collections in preparation for compute/persist/etc... Intended use is to find all collections in a set of (possibly nested) python objects, do something to them (compute, etc...), then repackage them in equivalent python objects. Parameters ---------- *args
(*args, traverse=True)
| 453 | |
| 454 | |
| 455 | def unpack_collections(*args, traverse=True): |
| 456 | """Extract collections in preparation for compute/persist/etc... |
| 457 | |
| 458 | Intended use is to find all collections in a set of (possibly nested) |
| 459 | python objects, do something to them (compute, etc...), then repackage them |
| 460 | in equivalent python objects. |
| 461 | |
| 462 | Parameters |
| 463 | ---------- |
| 464 | *args |
| 465 | Any number of objects. If it is a dask collection, it's extracted and |
| 466 | added to the list of collections returned. By default, python builtin |
| 467 | collections are also traversed to look for dask collections (for more |
| 468 | information see the ``traverse`` keyword). |
| 469 | traverse : bool, optional |
| 470 | If True (default), builtin python collections are traversed looking for |
| 471 | any dask collections they might contain. |
| 472 | |
| 473 | Returns |
| 474 | ------- |
| 475 | collections : list |
| 476 | A list of all dask collections contained in ``args`` |
| 477 | repack : callable |
| 478 | A function to call on the transformed collections to repackage them as |
| 479 | they were in the original ``args``. |
| 480 | """ |
| 481 | |
| 482 | collections = [] |
| 483 | repack_dsk = {} |
| 484 | |
| 485 | collections_token = uuid.uuid4().hex |
| 486 | |
| 487 | def _unpack(expr): |
| 488 | if is_dask_collection(expr): |
| 489 | tok = tokenize(expr) |
| 490 | if tok not in repack_dsk: |
| 491 | repack_dsk[tok] = Task( |
| 492 | tok, getitem, TaskRef(collections_token), len(collections) |
| 493 | ) |
| 494 | collections.append(expr) |
| 495 | return TaskRef(tok) |
| 496 | |
| 497 | tok = uuid.uuid4().hex |
| 498 | tsk: DataNode | Task # type: ignore[annotation-unchecked] |
| 499 | if not traverse: |
| 500 | tsk = DataNode(None, expr) |
| 501 | else: |
| 502 | # Treat iterators like lists |
| 503 | typ = list if isinstance(expr, Iterator) else type(expr) |
| 504 | if typ in (list, tuple, set): |
| 505 | tsk = Task(tok, typ, List(*[_unpack(i) for i in expr])) |
| 506 | elif typ in (dict, OrderedDict): |
| 507 | tsk = Task( |
| 508 | tok, typ, Dict({_unpack(k): _unpack(v) for k, v in expr.items()}) |
| 509 | ) |
| 510 | elif dataclasses.is_dataclass(expr) and not isinstance(expr, type): |
| 511 | tsk = Task( |
| 512 | tok, |
searching dependent graphs…