(fc *flow.FlowContext, task *flow.Task, location resource.Location, waitGroup *sync.WaitGroup)
| 76 | } |
| 77 | |
| 78 | func (s *Scheduler) setupInputChannels(fc *flow.FlowContext, task *flow.Task, location resource.Location, waitGroup *sync.WaitGroup) { |
| 79 | if len(task.Inputs) > 0 { |
| 80 | return |
| 81 | } |
| 82 | ds := task.Outputs[0].Parent |
| 83 | if len(ds.ExternalInputChans) == 0 { |
| 84 | return |
| 85 | } |
| 86 | // connect local typed chan to remote raw chan |
| 87 | // write to the dataset location in the cluster so that the task can be retried if needed. |
| 88 | for i, inChan := range ds.ExternalInputChans { |
| 89 | inputChanName := fmt.Sprintf("%s-ct-%d-input-%d-p-%d", s.Option.ExecutableFileHash, fc.Id, ds.Id, i) |
| 90 | // println("setup input channel for", task.Name(), "on", location.URL()) |
| 91 | s.shardLocator.SetShardLocation(inputChanName, location) |
| 92 | rawChan, err := netchan.GetDirectSendChannel(s.Option.TlsConfig, inputChanName, location.URL(), waitGroup) |
| 93 | if err != nil { |
| 94 | log.Panic(err) |
| 95 | } |
| 96 | // println("writing", inputChanName, "to", location.URL()) |
| 97 | netchan.ConnectTypedWriteChannelToRaw(inChan, rawChan, waitGroup) |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | func (s *Scheduler) setupOutputChannels(shards []*flow.DatasetShard, waitGroup *sync.WaitGroup) { |
| 102 | for _, shard := range shards { |
no test coverage detected