(wg *sync.WaitGroup, name2Location map[string]string)
| 117 | } |
| 118 | |
| 119 | func (tr *TaskRunner) connectExternalInputs(wg *sync.WaitGroup, name2Location map[string]string) { |
| 120 | firstTask := tr.Tasks[0] |
| 121 | for i, shard := range firstTask.Inputs { |
| 122 | d := shard.Parent |
| 123 | readChanName := tr.option.ExecutableFileHash + "-" + shard.Name() |
| 124 | // println("taskGroup", tr.option.TaskGroupId, "firstTask", firstTask.Name(), "trying to read from:", readChanName, len(firstTask.InputChans)) |
| 125 | rawChan, err := netchan.GetDirectReadChannel(tr.option.TaskTlsConfig, readChanName, name2Location[readChanName], tr.FlowContext.ChannelBufferSize) |
| 126 | if err != nil { |
| 127 | log.Panic(err) |
| 128 | } |
| 129 | inChanStatus := netchan.ConnectRawReadChannelToTyped(rawChan, firstTask.InputChans[i], d.Type, wg) |
| 130 | inChanStatus.Name = shard.DisplayName() |
| 131 | tr.executorStatus.InputChannelStatuses = append(tr.executorStatus.InputChannelStatuses, inChanStatus) |
| 132 | } |
| 133 | } |
| 134 | |
| 135 | func (tr *TaskRunner) connectExternalInputChannels(wg *sync.WaitGroup) { |
| 136 | // this is only for Channel dataset |
no test coverage detected