An expression that guarantees that all keys are suffixes with a unique id. This can be used to break a common subexpression apart.
| 1351 | |
| 1352 | |
| 1353 | class ProhibitReuse(Expr): |
| 1354 | """ |
| 1355 | An expression that guarantees that all keys are suffixes with a unique id. |
| 1356 | This can be used to break a common subexpression apart. |
| 1357 | """ |
| 1358 | |
| 1359 | _parameters = ["expr"] |
| 1360 | _ALLOWED_TYPES = [HLGExpr, LLGExpr, HLGFinalizeCompute, _HLGExprSequence] |
| 1361 | |
| 1362 | def __dask_keys__(self): |
| 1363 | return self._modify_keys(self.expr.__dask_keys__()) |
| 1364 | |
| 1365 | @staticmethod |
| 1366 | def _identity(obj): |
| 1367 | return obj |
| 1368 | |
| 1369 | @functools.cached_property |
| 1370 | def _suffix(self): |
| 1371 | return uuid.uuid4().hex |
| 1372 | |
| 1373 | def _modify_keys(self, k): |
| 1374 | if isinstance(k, list): |
| 1375 | return [self._modify_keys(kk) for kk in k] |
| 1376 | elif isinstance(k, tuple): |
| 1377 | return (self._modify_keys(k[0]),) + k[1:] |
| 1378 | elif isinstance(k, (int, float)): |
| 1379 | k = str(k) |
| 1380 | return f"{k}-{self._suffix}" |
| 1381 | |
| 1382 | def _simplify_down(self): |
| 1383 | # FIXME: Shuffling cannot be rewritten since the barrier key is |
| 1384 | # hardcoded. Skipping this here should do the trick most of the time |
| 1385 | if not isinstance( |
| 1386 | self.expr, |
| 1387 | tuple(self._ALLOWED_TYPES), |
| 1388 | ): |
| 1389 | return self.expr |
| 1390 | |
| 1391 | def __dask_graph__(self): |
| 1392 | try: |
| 1393 | from distributed.shuffle._core import P2PBarrierTask |
| 1394 | except ModuleNotFoundError: |
| 1395 | P2PBarrierTask = type(None) |
| 1396 | dsk = convert_legacy_graph(self.expr.__dask_graph__()) |
| 1397 | |
| 1398 | subs = {old_key: self._modify_keys(old_key) for old_key in dsk} |
| 1399 | dsk2 = {} |
| 1400 | for old_key, new_key in subs.items(): |
| 1401 | t = dsk[old_key] |
| 1402 | if isinstance(t, P2PBarrierTask): |
| 1403 | warnings.warn( |
| 1404 | "Cannot block reusing for graphs including a " |
| 1405 | "P2PBarrierTask. This may cause unexpected results. " |
| 1406 | "This typically happens when converting a dask " |
| 1407 | "DataFrame to delayed objects.", |
| 1408 | UserWarning, |
| 1409 | ) |
| 1410 | return dsk |
no outgoing calls
searching dependent graphs…