| 33 | |
| 34 | |
| 35 | class Array(DaskMethodsMixin): |
| 36 | __dask_scheduler__ = staticmethod( |
| 37 | named_schedulers.get("threads", named_schedulers["sync"]) |
| 38 | ) |
| 39 | __dask_optimize__ = staticmethod(lambda dsk, keys, **kwargs: dsk) |
| 40 | |
| 41 | def __init__(self, expr): |
| 42 | self._expr = expr |
| 43 | |
| 44 | @property |
| 45 | def expr(self) -> ArrayExpr: |
| 46 | return self._expr |
| 47 | |
| 48 | @property |
| 49 | def _name(self): |
| 50 | return self.expr._name |
| 51 | |
| 52 | def __dask_postcompute__(self): |
| 53 | return finalize, () |
| 54 | |
| 55 | def __dask_postpersist__(self): |
| 56 | state = self.expr.lower_completely() |
| 57 | return from_graph, ( |
| 58 | state._meta, |
| 59 | state.chunks, |
| 60 | # FIXME: This is using keys of the unoptimized graph |
| 61 | list(flatten(state.__dask_keys__())), |
| 62 | key_split(state._name), |
| 63 | ) |
| 64 | |
| 65 | @property |
| 66 | def dask(self): |
| 67 | return self.__dask_graph__() |
| 68 | |
| 69 | def __dask_graph__(self): |
| 70 | out = self.expr.lower_completely() |
| 71 | return out.__dask_graph__() |
| 72 | |
| 73 | def __dask_keys__(self): |
| 74 | out = self.expr.lower_completely() |
| 75 | return out.__dask_keys__() |
| 76 | |
| 77 | def __dask_tokenize__(self): |
| 78 | return "Array", self.expr._name |
| 79 | |
| 80 | def compute(self, **kwargs): |
| 81 | return DaskMethodsMixin.compute(self.optimize(), **kwargs) |
| 82 | |
| 83 | def persist(self, **kwargs): |
| 84 | return DaskMethodsMixin.persist(self.optimize(), **kwargs) |
| 85 | |
| 86 | def optimize(self): |
| 87 | return new_collection(self.expr.optimize()) |
| 88 | |
| 89 | def simplify(self): |
| 90 | return new_collection(self.expr.simplify()) |
| 91 | |
| 92 | @property |
no test coverage detected
searching dependent graphs…