Requirement is TaskGroup Object is Agent's Location
(demands []market.Demand)
| 17 | // Requirement is TaskGroup |
| 18 | // Object is Agent's Location |
| 19 | func (s *Scheduler) Fetch(demands []market.Demand) { |
| 20 | var request resource.AllocationRequest |
| 21 | for _, d := range demands { |
| 22 | demand := d.Requirement.(*plan.TaskGroup) |
| 23 | request.Requests = append(request.Requests, resource.ComputeRequest{ |
| 24 | ComputeResource: resource.ComputeResource{ |
| 25 | CPUCount: 1, |
| 26 | CPULevel: 1, |
| 27 | MemoryMB: int64(s.Option.TaskMemoryMB), |
| 28 | }, |
| 29 | Inputs: s.findTaskGroupInputs(demand), |
| 30 | }) |
| 31 | } |
| 32 | |
| 33 | result, err := Assign(s.Leader, &request) |
| 34 | if err != nil { |
| 35 | log.Printf("%s Failed to allocate: %v", s.Leader, err) |
| 36 | time.Sleep(time.Millisecond * time.Duration(15000+rand.Int63n(5000))) |
| 37 | } else { |
| 38 | if len(result.Allocations) == 0 { |
| 39 | log.Printf("%s Failed to allocate any executor.", s.Leader) |
| 40 | time.Sleep(time.Millisecond * time.Duration(2000+rand.Int63n(1000))) |
| 41 | } else { |
| 42 | log.Printf("%s allocated %d executors.", s.Leader, len(result.Allocations)) |
| 43 | for _, allocation := range result.Allocations { |
| 44 | s.Market.AddSupply(market.Supply{ |
| 45 | Object: allocation, |
| 46 | }) |
| 47 | } |
| 48 | } |
| 49 | } |
| 50 | } |
| 51 | |
| 52 | func (s *Scheduler) findTaskGroupInputs(tg *plan.TaskGroup) (ret []resource.DataResource) { |
| 53 | firstTask := tg.Tasks[0] |
nothing calls this directly
no test coverage detected