Optimize several dask collections at once. Returns equivalent dask collections that all share the same merged and optimized underlying graph. This can be useful if converting multiple collections to delayed objects, or to manually apply the optimizations at strategic points. No
(*args, traverse=True, **kwargs)
| 537 | |
| 538 | |
| 539 | def optimize(*args, traverse=True, **kwargs): |
| 540 | """Optimize several dask collections at once. |
| 541 | |
| 542 | Returns equivalent dask collections that all share the same merged and |
| 543 | optimized underlying graph. This can be useful if converting multiple |
| 544 | collections to delayed objects, or to manually apply the optimizations at |
| 545 | strategic points. |
| 546 | |
| 547 | Note that in most cases you shouldn't need to call this function directly. |
| 548 | |
| 549 | Warning:: |
| 550 | |
| 551 | This function triggers a materialization of the collections and looses |
| 552 | any annotations attached to HLG layers. |
| 553 | |
| 554 | Parameters |
| 555 | ---------- |
| 556 | *args : objects |
| 557 | Any number of objects. If a dask object, its graph is optimized and |
| 558 | merged with all those of all other dask objects before returning an |
| 559 | equivalent dask collection. Non-dask arguments are passed through |
| 560 | unchanged. |
| 561 | traverse : bool, optional |
| 562 | By default dask traverses builtin python collections looking for dask |
| 563 | objects passed to ``optimize``. For large collections this can be |
| 564 | expensive. If none of the arguments contain any dask objects, set |
| 565 | ``traverse=False`` to avoid doing this traversal. |
| 566 | optimizations : list of callables, optional |
| 567 | Additional optimization passes to perform. |
| 568 | **kwargs |
| 569 | Extra keyword arguments to forward to the optimization passes. |
| 570 | |
| 571 | Examples |
| 572 | -------- |
| 573 | >>> import dask |
| 574 | >>> import dask.array as da |
| 575 | >>> a = da.arange(10, chunks=2).sum() |
| 576 | >>> b = da.arange(10, chunks=2).mean() |
| 577 | >>> a2, b2 = dask.optimize(a, b) |
| 578 | |
| 579 | >>> a2.compute() == a.compute() |
| 580 | np.True_ |
| 581 | >>> b2.compute() == b.compute() |
| 582 | np.True_ |
| 583 | """ |
| 584 | # TODO: This API is problematic. The approach to using postpersist forces us |
| 585 | # to materialize the graph. Most low level optimizations will materialize as |
| 586 | # well |
| 587 | collections, repack = unpack_collections(*args, traverse=traverse) |
| 588 | if not collections: |
| 589 | return args |
| 590 | |
| 591 | dsk = collections_to_expr(collections) |
| 592 | |
| 593 | postpersists = [] |
| 594 | for a in collections: |
| 595 | r, s = a.__dask_postpersist__() |
| 596 | postpersists.append(r(dsk.__dask_graph__(), *s)) |
searching dependent graphs…