MCPcopy
hub / github.com/dgraph-io/dgraph / poll

Method poll

graphql/subscription/poller.go:164–233  ·  view source on GitHub ↗
(req *pollRequest)

Source from the content-addressed store, hash-verified

162}
163
164func (p *Poller) poll(req *pollRequest) {
165 p.RLock()
166 resolver := p.resolver
167 p.RUnlock()
168
169 pollID := uint64(0)
170 for {
171 pollID++
172 time.Sleep(x.Config.GraphQL.GetDuration("poll-interval"))
173
174 globalEpoch := atomic.LoadUint64(p.globalEpoch)
175 if req.localEpoch != globalEpoch || globalEpoch == math.MaxUint64 {
176 // There is a schema change since local epoch is diffrent from global schema epoch.
177 // We'll terminate all the subscription for this bucket. So, that all client can
178 // reconnect and listen for new schema.
179 p.terminateSubscriptions(req.bucketID)
180 }
181
182 ctx := x.AttachAccessJwt(context.Background(), &http.Request{Header: req.graphqlReq.Header})
183 res := resolver.Resolve(ctx, req.graphqlReq)
184
185 currentHash := farm.Fingerprint64(res.Data.Bytes())
186
187 if req.prevHash == currentHash {
188 if pollID%2 != 0 {
189 // Don't update if there is no change in response.
190 continue
191 }
192 // Every second poll, we'll check if there is any active subscription for the
193 // current goroutine. If not we'll terminate this poll.
194 p.Lock()
195 subscribers, ok := p.pollRegistry[req.bucketID]
196 if !ok || len(subscribers) == 0 {
197 delete(p.pollRegistry, req.bucketID)
198 p.Unlock()
199 return
200 }
201 for subscriberID, subscriber := range subscribers {
202 if !subscriber.expiry.IsZero() && time.Now().After(subscriber.expiry) {
203 p.terminateSubscription(req.bucketID, subscriberID)
204 }
205
206 }
207 p.Unlock()
208 continue
209 }
210 req.prevHash = currentHash
211
212 p.Lock()
213 subscribers, ok := p.pollRegistry[req.bucketID]
214 if !ok || len(subscribers) == 0 {
215 // There is no subscribers to push the update. So, kill the current polling
216 // go routine.
217 delete(p.pollRegistry, req.bucketID)
218 p.Unlock()
219 return
220 }
221

Callers 1

AddSubscriberMethod · 0.95

Calls 9

terminateSubscriptionMethod · 0.95
AttachAccessJwtFunction · 0.92
RLockMethod · 0.80
RUnlockMethod · 0.80
OutputMethod · 0.80
ResolveMethod · 0.65
LockMethod · 0.45
UnlockMethod · 0.45

Tested by

no test coverage detected