MCPcopy
hub / github.com/chrislusf/glow / Fetch

Method Fetch

driver/scheduler/scheduler_fetch.go:19–50  ·  view source on GitHub ↗

Requirement is TaskGroup Object is Agent's Location

(demands []market.Demand)

Source from the content-addressed store, hash-verified

17// Requirement is TaskGroup
18// Object is Agent's Location
19func (s *Scheduler) Fetch(demands []market.Demand) {
20 var request resource.AllocationRequest
21 for _, d := range demands {
22 demand := d.Requirement.(*plan.TaskGroup)
23 request.Requests = append(request.Requests, resource.ComputeRequest{
24 ComputeResource: resource.ComputeResource{
25 CPUCount: 1,
26 CPULevel: 1,
27 MemoryMB: int64(s.Option.TaskMemoryMB),
28 },
29 Inputs: s.findTaskGroupInputs(demand),
30 })
31 }
32
33 result, err := Assign(s.Leader, &request)
34 if err != nil {
35 log.Printf("%s Failed to allocate: %v", s.Leader, err)
36 time.Sleep(time.Millisecond * time.Duration(15000+rand.Int63n(5000)))
37 } else {
38 if len(result.Allocations) == 0 {
39 log.Printf("%s Failed to allocate any executor.", s.Leader)
40 time.Sleep(time.Millisecond * time.Duration(2000+rand.Int63n(1000)))
41 } else {
42 log.Printf("%s allocated %d executors.", s.Leader, len(result.Allocations))
43 for _, allocation := range result.Allocations {
44 s.Market.AddSupply(market.Supply{
45 Object: allocation,
46 })
47 }
48 }
49 }
50}
51
52func (s *Scheduler) findTaskGroupInputs(tg *plan.TaskGroup) (ret []resource.DataResource) {
53 firstTask := tg.Tasks[0]

Callers

nothing calls this directly

Calls 3

findTaskGroupInputsMethod · 0.95
AssignFunction · 0.85
AddSupplyMethod · 0.80

Tested by

no test coverage detected