map can work with multiple kinds of inputs and outputs Input Types: 1. single value 2. (key, value) : Most common format for key value pair 3. (key, values) : GroupByKey() outputs 4. (key, values1, values2) : CoGroup() outputs 5. (key, value1, value2) : Join() outputs Output Types: 1. return single
(f interface{})
| 17 | // 3. return no value |
| 18 | // 4. return no value, but last parameter is a output channel |
| 19 | func (d *Dataset) Map(f interface{}) *Dataset { |
| 20 | outType := guessFunctionOutputType(f) |
| 21 | ret, step := add1ShardTo1Step(d, outType) |
| 22 | step.Name = "Map" |
| 23 | step.Function = func(task *Task) { |
| 24 | |
| 25 | invokeMapFunc := _buildMapperFunction(f, task) |
| 26 | |
| 27 | for input := range task.InputChan() { |
| 28 | invokeMapFunc(input) |
| 29 | } |
| 30 | // println("exiting d:", d.Id, "step:", step.Id, "task:", task.Id) |
| 31 | } |
| 32 | return ret |
| 33 | } |
| 34 | |
| 35 | func _buildMapperFunction(f interface{}, task *Task) func(input reflect.Value) { |
| 36 | fn, ft := reflect.ValueOf(f), reflect.TypeOf(f) |