MCPcopy Index your code
hub / github.com/deepflowio/deepflow / startSubDomainRefreshMonitor

Method startSubDomainRefreshMonitor

server/controller/manager/task.go:120–155  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

118}
119
120func (t *Task) startSubDomainRefreshMonitor() {
121 go func() {
122 ticker := time.NewTicker(time.Millisecond * 200)
123 defer ticker.Stop()
124
125 LOOP:
126 for {
127 select {
128 case <-ticker.C:
129 for item := range t.subDomainRefreshSignals.IterBuffered() {
130 lcuuid := item.Key
131 signal := item.Val
132
133 // TODO 考虑改为并发
134 if signal.Len() != 0 {
135 signal.Get()
136 log.Infof("task (%s) sub_domain (%s) call recorder refresh", t.DomainName, lcuuid, t.LogPrefixORGID)
137
138 if err := t.Recorder.Refresh(recorder.RefreshTargetSubDomain, t.Cloud.GetSubDomainResource(lcuuid)); err != nil {
139 if errors.Is(err, recorder.RefreshConflictError) {
140 log.Warningf("task (%s) sub_domain (%s) refresh conflict, retry after 5 seconds", t.DomainName, lcuuid, t.LogPrefixORGID)
141 signal.Put(struct{}{})
142 time.Sleep(time.Duration(recorderRefreshTryInterval) * time.Second)
143 } else {
144 log.Warningf("task (%s) sub_domain (%s) refresh failed: %s", t.DomainName, lcuuid, err.Error(), t.LogPrefixORGID)
145 }
146 }
147 }
148 }
149 case <-t.tCtx.Done():
150 break LOOP
151 }
152 }
153 }()
154
155}
156
157func (t *Task) Stop() {
158 t.Recorder.Stop()

Callers 1

StartMethod · 0.95

Calls 9

StopMethod · 0.65
LenMethod · 0.65
GetMethod · 0.65
RefreshMethod · 0.65
PutMethod · 0.65
ErrorMethod · 0.65
InfofMethod · 0.45
GetSubDomainResourceMethod · 0.45
WarningfMethod · 0.45

Tested by

no test coverage detected