Do inputs come from the same parents, modulo blockwise?
(*exprs)
| 3203 | |
| 3204 | |
| 3205 | def are_co_aligned(*exprs): |
| 3206 | """Do inputs come from the same parents, modulo blockwise?""" |
| 3207 | |
| 3208 | from dask.dataframe.dask_expr._cumulative import CumulativeAggregations |
| 3209 | from dask.dataframe.dask_expr._reductions import Reduction |
| 3210 | |
| 3211 | seen = set() |
| 3212 | # Scalars can always be broadcasted |
| 3213 | stack = [e for e in exprs if e.ndim > 0] |
| 3214 | ancestors = [] |
| 3215 | while stack: |
| 3216 | e = stack.pop() |
| 3217 | if e._name in seen: |
| 3218 | continue |
| 3219 | seen.add(e._name) |
| 3220 | |
| 3221 | if isinstance(e, IO): |
| 3222 | ancestors.append(e) |
| 3223 | elif e.ndim == 0: |
| 3224 | # Scalars are valid ancestors that are always broadcastable, |
| 3225 | # so don't walk through them |
| 3226 | continue |
| 3227 | elif isinstance(e, (_DelayedExpr, Isin)): |
| 3228 | continue |
| 3229 | elif isinstance(e, (Blockwise, CumulativeAggregations, Reduction)): |
| 3230 | # TODO: Capture this in inheritance logic |
| 3231 | dependencies = e.dependencies() |
| 3232 | stack.extend(dependencies) |
| 3233 | else: |
| 3234 | ancestors.append(e) |
| 3235 | |
| 3236 | unique_ancestors = { |
| 3237 | # Account for column projection within IO expressions |
| 3238 | _tokenize_partial(item, ["columns", "_series", "_dataset_info_cache"]) |
| 3239 | for item in ancestors |
| 3240 | } |
| 3241 | # Don't check divisions or npartitions at all |
| 3242 | return len(unique_ancestors) <= 1 |
| 3243 | |
| 3244 | |
| 3245 | ## Utilities for Expr fusion |
searching dependent graphs…