MCPcopy
hub / github.com/netdata/netdata / enqueue

Method enqueue

src/go/plugin/framework/functions/scheduler.go:51–93  ·  view source on GitHub ↗
(req *invocationRequest)

Source from the content-addressed store, hash-verified

49}
50
51func (s *keyScheduler) enqueue(req *invocationRequest) error {
52 if req == nil || req.fn == nil || req.fn.UID == "" || req.scheduleKey == "" {
53 return errSchedulerInvalid
54 }
55
56 s.mux.Lock()
57 defer s.mux.Unlock()
58
59 // Block until there is space, or the scheduler is stopped. We must not
60 // drop dyncfg commands silently: an awaited enable/disable that is dropped
61 // here would wedge jobmgr's wait gate by leaving it waiting for a
62 // completion that will never arrive. Back-pressure flows upstream: the
63 // manager run-loop stops draining stdin, and netdata's write blocks on
64 // the OS pipe.
65 for {
66 if s.stopping || !s.accepting {
67 return errSchedulerStopping
68 }
69 if s.maxPending <= 0 || s.pending < s.maxPending {
70 break
71 }
72 s.enqueueWaiters++
73 s.cond.Wait()
74 s.enqueueWaiters--
75 }
76
77 lane := s.lanes[req.scheduleKey]
78 if lane == nil {
79 lane = &scheduleLane{}
80 s.lanes[req.scheduleKey] = lane
81 }
82
83 s.pending++
84 if lane.ownerUID == "" {
85 lane.ownerUID = req.fn.UID
86 s.ready = append(s.ready, req)
87 s.cond.Signal()
88 return nil
89 }
90
91 lane.queue = append(lane.queue, req)
92 return nil
93}
94
95func (s *keyScheduler) next() (*invocationRequest, bool) {
96 s.mux.Lock()

Calls 3

LockMethod · 0.80
UnlockMethod · 0.80
WaitMethod · 0.65