f(V, V) bool : less than function New Dataset contains K,V
(f interface{})
| 14 | // f(V, V) bool : less than function |
| 15 | // New Dataset contains K,V |
| 16 | func (d *Dataset) LocalSort(f interface{}) *Dataset { |
| 17 | if f == nil && d.IsKeyLocalSorted { |
| 18 | return d |
| 19 | } |
| 20 | ret, step := add1ShardTo1Step(d, d.Type) |
| 21 | ret.IsKeyPartitioned = d.IsKeyPartitioned |
| 22 | if f == nil { |
| 23 | ret.IsKeyLocalSorted = true |
| 24 | } |
| 25 | step.Name = "LocalSort" |
| 26 | step.Function = func(task *Task) { |
| 27 | outChan := task.Outputs[0].WriteChan |
| 28 | var kvs []interface{} |
| 29 | for input := range task.InputChan() { |
| 30 | kvs = append(kvs, input.Interface()) |
| 31 | } |
| 32 | if len(kvs) == 0 { |
| 33 | return |
| 34 | } |
| 35 | comparator := getLessThanComparator(d.Type, reflect.ValueOf(kvs[0]), f) |
| 36 | timsort.Sort(kvs, comparator) |
| 37 | |
| 38 | for _, kv := range kvs { |
| 39 | outChan.Send(reflect.ValueOf(kv)) |
| 40 | // println(task.Name(), "sent kv index:", i) |
| 41 | } |
| 42 | } |
| 43 | return ret |
| 44 | } |
| 45 | |
| 46 | func (d *Dataset) MergeSorted(f interface{}) (ret *Dataset) { |
| 47 | ret = d.context.newNextDataset(1, d.Type) |
no test coverage detected