(self)
| 507 | p.append(d, fsync=True) |
| 508 | |
| 509 | def _layer(self): |
| 510 | from dask.dataframe.dispatch import partd_encode_dispatch |
| 511 | |
| 512 | column = self.partitioning_index |
| 513 | df = self.frame |
| 514 | |
| 515 | always_new_token = uuid.uuid1().hex |
| 516 | |
| 517 | p = (f"zpartd-{always_new_token}",) |
| 518 | encode_cls = partd_encode_dispatch(df._meta) |
| 519 | dsk1 = {p: (maybe_buffered_partd(encode_cls=encode_cls),)} |
| 520 | |
| 521 | # Partition data on disk |
| 522 | name = f"shuffle-partition-{always_new_token}" |
| 523 | dsk2 = { |
| 524 | (name, i): (self._shuffle_group, key, column, self._partitions, p) |
| 525 | for i, key in enumerate(df.__dask_keys__()) |
| 526 | } |
| 527 | |
| 528 | # Barrier |
| 529 | barrier_token = (f"barrier-{always_new_token}",) |
| 530 | dsk3 = {barrier_token: (barrier, list(dsk2))} |
| 531 | |
| 532 | # Collect groups |
| 533 | dsk4 = { |
| 534 | (self._name, j): (collect, p, k, df._meta, barrier_token) |
| 535 | for j, k in enumerate(self._partitions) |
| 536 | } |
| 537 | |
| 538 | return toolz.merge(dsk1, dsk2, dsk3, dsk4) |
| 539 | |
| 540 | |
| 541 | def _shuffle_transfer( |
nothing calls this directly
no test coverage detected