| 3144 | |
| 3145 | |
| 3146 | class DelayedsExpr(Expr): |
| 3147 | def __str__(self): |
| 3148 | return f"{type(self).__name__}({self.operands[0]})" |
| 3149 | |
| 3150 | @functools.cached_property |
| 3151 | def _name(self): |
| 3152 | return f"delayed-container-{self.deterministic_token}" |
| 3153 | |
| 3154 | def _layer(self) -> dict: |
| 3155 | from dask.delayed import Delayed |
| 3156 | |
| 3157 | if isinstance(self.operands[0], TaskRef): |
| 3158 | tasks = [ |
| 3159 | Alias((self._name, ix), fut.key) for ix, fut in enumerate(self.operands) |
| 3160 | ] |
| 3161 | dsk = {t.key: t for t in tasks} |
| 3162 | elif isinstance(self.operands[0], Delayed): |
| 3163 | expr = collections_to_expr(self.operands).optimize() |
| 3164 | keys = expr.__dask_keys__() |
| 3165 | dsk = expr.__dask_graph__() |
| 3166 | # Many APIs in dask-expr are not honoring __dask_keys__ but are instead |
| 3167 | # assuming they can just construct the keys themselves by walking the |
| 3168 | # partitions. Therefore we'll have to remap the key names and can't just |
| 3169 | # expose __dask_keys__() |
| 3170 | for ix, actual_key in enumerate(keys): |
| 3171 | dsk[(self._name, ix)] = Alias((self._name, ix), actual_key[0]) |
| 3172 | else: |
| 3173 | raise TypeError("Expected a Delayed or Future object") |
| 3174 | |
| 3175 | return dsk |
| 3176 | |
| 3177 | def _divisions(self): |
| 3178 | return (None,) * (len(self.operands) + 1) |
| 3179 | |
| 3180 | @property |
| 3181 | def ndim(self): |
| 3182 | return 0 |
| 3183 | |
| 3184 | |
| 3185 | def is_broadcastable(dfs, s): |
no outgoing calls
no test coverage detected
searching dependent graphs…