MCPcopy
hub / github.com/chrislusf/glow / LocalSort

Method LocalSort

flow/dataset_sort.go:16–44  ·  view source on GitHub ↗

f(V, V) bool : less than function New Dataset contains K,V

(f interface{})

Source from the content-addressed store, hash-verified

14// f(V, V) bool : less than function
15// New Dataset contains K,V
16func (d *Dataset) LocalSort(f interface{}) *Dataset {
17 if f == nil && d.IsKeyLocalSorted {
18 return d
19 }
20 ret, step := add1ShardTo1Step(d, d.Type)
21 ret.IsKeyPartitioned = d.IsKeyPartitioned
22 if f == nil {
23 ret.IsKeyLocalSorted = true
24 }
25 step.Name = "LocalSort"
26 step.Function = func(task *Task) {
27 outChan := task.Outputs[0].WriteChan
28 var kvs []interface{}
29 for input := range task.InputChan() {
30 kvs = append(kvs, input.Interface())
31 }
32 if len(kvs) == 0 {
33 return
34 }
35 comparator := getLessThanComparator(d.Type, reflect.ValueOf(kvs[0]), f)
36 timsort.Sort(kvs, comparator)
37
38 for _, kv := range kvs {
39 outChan.Send(reflect.ValueOf(kv))
40 // println(task.Name(), "sent kv index:", i)
41 }
42 }
43 return ret
44}
45
46func (d *Dataset) MergeSorted(f interface{}) (ret *Dataset) {
47 ret = d.context.newNextDataset(1, d.Type)

Callers 6

ReduceByKeyMethod · 0.95
SortMethod · 0.95
GroupByKeyMethod · 0.80
CoGroupMethod · 0.80
JoinMethod · 0.80

Calls 4

add1ShardTo1StepFunction · 0.85
getLessThanComparatorFunction · 0.85
InputChanMethod · 0.80
SortMethod · 0.80

Tested by

no test coverage detected