MCPcopy
hub / github.com/dask/dask / _layer

Method _layer

dask/dataframe/dask_expr/_shuffle.py:509–538  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

507 p.append(d, fsync=True)
508
509 def _layer(self):
510 from dask.dataframe.dispatch import partd_encode_dispatch
511
512 column = self.partitioning_index
513 df = self.frame
514
515 always_new_token = uuid.uuid1().hex
516
517 p = (f"zpartd-{always_new_token}",)
518 encode_cls = partd_encode_dispatch(df._meta)
519 dsk1 = {p: (maybe_buffered_partd(encode_cls=encode_cls),)}
520
521 # Partition data on disk
522 name = f"shuffle-partition-{always_new_token}"
523 dsk2 = {
524 (name, i): (self._shuffle_group, key, column, self._partitions, p)
525 for i, key in enumerate(df.__dask_keys__())
526 }
527
528 # Barrier
529 barrier_token = (f"barrier-{always_new_token}",)
530 dsk3 = {barrier_token: (barrier, list(dsk2))}
531
532 # Collect groups
533 dsk4 = {
534 (self._name, j): (collect, p, k, df._meta, barrier_token)
535 for j, k in enumerate(self._partitions)
536 }
537
538 return toolz.merge(dsk1, dsk2, dsk3, dsk4)
539
540
541def _shuffle_transfer(

Callers

nothing calls this directly

Calls 3

__dask_keys__Method · 0.45
mergeMethod · 0.45

Tested by

no test coverage detected