MCPcopy
hub / github.com/dask/dask / reduction

Method reduction

dask/bag/core.py:1007–1075  ·  view source on GitHub ↗

Reduce collection with reduction operators. Parameters ---------- perpartition: function reduction to apply to each partition aggregate: function reduction to apply to the results of all partitions split_every: int (optional)

(
        self, perpartition, aggregate, split_every=None, out_type=Item, name=None
    )

Source from the content-addressed store, hash-verified

1005 return self.reduction(func, agg, out_type=Bag, name="distinct")
1006
1007 def reduction(
1008 self, perpartition, aggregate, split_every=None, out_type=Item, name=None
1009 ):
1010 """Reduce collection with reduction operators.
1011
1012 Parameters
1013 ----------
1014 perpartition: function
1015 reduction to apply to each partition
1016 aggregate: function
1017 reduction to apply to the results of all partitions
1018 split_every: int (optional)
1019 Group partitions into groups of this size while performing reduction
1020 Defaults to 8
1021 out_type: {Bag, Item}
1022 The out type of the result, Item if a single element, Bag if a list
1023 of elements. Defaults to Item.
1024
1025 Examples
1026 --------
1027 >>> import dask.bag as db
1028 >>> b = db.from_sequence(range(10))
1029 >>> b.reduction(sum, sum).compute()
1030 45
1031 """
1032 if split_every is None:
1033 split_every = 8
1034 if split_every is False:
1035 split_every = self.npartitions
1036
1037 token = tokenize(self, perpartition, aggregate, split_every)
1038 a = f"{name or funcname(perpartition)}-part-{token}"
1039 is_last = self.npartitions == 1
1040 dsk = {
1041 (a, i): (empty_safe_apply, perpartition, (self.name, i), is_last)
1042 for i in range(self.npartitions)
1043 }
1044 k = self.npartitions
1045 b = a
1046 fmt = f"{name or funcname(aggregate)}-aggregate-{token}"
1047 depth = 0
1048
1049 while k > split_every:
1050 c = fmt + str(depth)
1051 for i, inds in enumerate(partition_all(split_every, range(k))):
1052 dsk[(c, i)] = (
1053 empty_safe_aggregate,
1054 aggregate,
1055 [(b, j) for j in inds],
1056 False,
1057 )
1058
1059 k = i + 1
1060 b = c
1061 depth += 1
1062
1063 dsk[(fmt, 0)] = (
1064 empty_safe_aggregate,

Callers 15

foldMethod · 0.95
frequenciesMethod · 0.95
topkMethod · 0.95
distinctMethod · 0.95
sumMethod · 0.95
maxMethod · 0.95
minMethod · 0.95
anyMethod · 0.95
allMethod · 0.95
countMethod · 0.95
meanMethod · 0.95
varMethod · 0.95

Calls 6

funcnameFunction · 0.90
ItemClass · 0.85
BagClass · 0.85
from_collectionsMethod · 0.80
popMethod · 0.80
tokenizeFunction · 0.50

Tested by 7

test_reductions_are_lazyFunction · 0.76
test_reduction_namesFunction · 0.36
test_tree_reductionsFunction · 0.36
test_weighted_reductionFunction · 0.36