MCPcopy
hub / github.com/chrislusf/glow / setupInputChannels

Method setupInputChannels

driver/scheduler/scheduler_execute.go:78–99  ·  view source on GitHub ↗
(fc *flow.FlowContext, task *flow.Task, location resource.Location, waitGroup *sync.WaitGroup)

Source from the content-addressed store, hash-verified

76}
77
78func (s *Scheduler) setupInputChannels(fc *flow.FlowContext, task *flow.Task, location resource.Location, waitGroup *sync.WaitGroup) {
79 if len(task.Inputs) > 0 {
80 return
81 }
82 ds := task.Outputs[0].Parent
83 if len(ds.ExternalInputChans) == 0 {
84 return
85 }
86 // connect local typed chan to remote raw chan
87 // write to the dataset location in the cluster so that the task can be retried if needed.
88 for i, inChan := range ds.ExternalInputChans {
89 inputChanName := fmt.Sprintf("%s-ct-%d-input-%d-p-%d", s.Option.ExecutableFileHash, fc.Id, ds.Id, i)
90 // println("setup input channel for", task.Name(), "on", location.URL())
91 s.shardLocator.SetShardLocation(inputChanName, location)
92 rawChan, err := netchan.GetDirectSendChannel(s.Option.TlsConfig, inputChanName, location.URL(), waitGroup)
93 if err != nil {
94 log.Panic(err)
95 }
96 // println("writing", inputChanName, "to", location.URL())
97 netchan.ConnectTypedWriteChannelToRaw(inChan, rawChan, waitGroup)
98 }
99}
100
101func (s *Scheduler) setupOutputChannels(shards []*flow.DatasetShard, waitGroup *sync.WaitGroup) {
102 for _, shard := range shards {

Callers 1

Calls 2

SetShardLocationMethod · 0.80
URLMethod · 0.80

Tested by

no test coverage detected