Reduce collection with reduction operators. Parameters ---------- perpartition: function reduction to apply to each partition aggregate: function reduction to apply to the results of all partitions split_every: int (optional)
(
self, perpartition, aggregate, split_every=None, out_type=Item, name=None
)
| 1005 | return self.reduction(func, agg, out_type=Bag, name="distinct") |
| 1006 | |
| 1007 | def reduction( |
| 1008 | self, perpartition, aggregate, split_every=None, out_type=Item, name=None |
| 1009 | ): |
| 1010 | """Reduce collection with reduction operators. |
| 1011 | |
| 1012 | Parameters |
| 1013 | ---------- |
| 1014 | perpartition: function |
| 1015 | reduction to apply to each partition |
| 1016 | aggregate: function |
| 1017 | reduction to apply to the results of all partitions |
| 1018 | split_every: int (optional) |
| 1019 | Group partitions into groups of this size while performing reduction |
| 1020 | Defaults to 8 |
| 1021 | out_type: {Bag, Item} |
| 1022 | The out type of the result, Item if a single element, Bag if a list |
| 1023 | of elements. Defaults to Item. |
| 1024 | |
| 1025 | Examples |
| 1026 | -------- |
| 1027 | >>> import dask.bag as db |
| 1028 | >>> b = db.from_sequence(range(10)) |
| 1029 | >>> b.reduction(sum, sum).compute() |
| 1030 | 45 |
| 1031 | """ |
| 1032 | if split_every is None: |
| 1033 | split_every = 8 |
| 1034 | if split_every is False: |
| 1035 | split_every = self.npartitions |
| 1036 | |
| 1037 | token = tokenize(self, perpartition, aggregate, split_every) |
| 1038 | a = f"{name or funcname(perpartition)}-part-{token}" |
| 1039 | is_last = self.npartitions == 1 |
| 1040 | dsk = { |
| 1041 | (a, i): (empty_safe_apply, perpartition, (self.name, i), is_last) |
| 1042 | for i in range(self.npartitions) |
| 1043 | } |
| 1044 | k = self.npartitions |
| 1045 | b = a |
| 1046 | fmt = f"{name or funcname(aggregate)}-aggregate-{token}" |
| 1047 | depth = 0 |
| 1048 | |
| 1049 | while k > split_every: |
| 1050 | c = fmt + str(depth) |
| 1051 | for i, inds in enumerate(partition_all(split_every, range(k))): |
| 1052 | dsk[(c, i)] = ( |
| 1053 | empty_safe_aggregate, |
| 1054 | aggregate, |
| 1055 | [(b, j) for j in inds], |
| 1056 | False, |
| 1057 | ) |
| 1058 | |
| 1059 | k = i + 1 |
| 1060 | b = c |
| 1061 | depth += 1 |
| 1062 | |
| 1063 | dsk[(fmt, 0)] = ( |
| 1064 | empty_safe_aggregate, |