(req *pollRequest)
| 162 | } |
| 163 | |
| 164 | func (p *Poller) poll(req *pollRequest) { |
| 165 | p.RLock() |
| 166 | resolver := p.resolver |
| 167 | p.RUnlock() |
| 168 | |
| 169 | pollID := uint64(0) |
| 170 | for { |
| 171 | pollID++ |
| 172 | time.Sleep(x.Config.GraphQL.GetDuration("poll-interval")) |
| 173 | |
| 174 | globalEpoch := atomic.LoadUint64(p.globalEpoch) |
| 175 | if req.localEpoch != globalEpoch || globalEpoch == math.MaxUint64 { |
| 176 | // There is a schema change since local epoch is diffrent from global schema epoch. |
| 177 | // We'll terminate all the subscription for this bucket. So, that all client can |
| 178 | // reconnect and listen for new schema. |
| 179 | p.terminateSubscriptions(req.bucketID) |
| 180 | } |
| 181 | |
| 182 | ctx := x.AttachAccessJwt(context.Background(), &http.Request{Header: req.graphqlReq.Header}) |
| 183 | res := resolver.Resolve(ctx, req.graphqlReq) |
| 184 | |
| 185 | currentHash := farm.Fingerprint64(res.Data.Bytes()) |
| 186 | |
| 187 | if req.prevHash == currentHash { |
| 188 | if pollID%2 != 0 { |
| 189 | // Don't update if there is no change in response. |
| 190 | continue |
| 191 | } |
| 192 | // Every second poll, we'll check if there is any active subscription for the |
| 193 | // current goroutine. If not we'll terminate this poll. |
| 194 | p.Lock() |
| 195 | subscribers, ok := p.pollRegistry[req.bucketID] |
| 196 | if !ok || len(subscribers) == 0 { |
| 197 | delete(p.pollRegistry, req.bucketID) |
| 198 | p.Unlock() |
| 199 | return |
| 200 | } |
| 201 | for subscriberID, subscriber := range subscribers { |
| 202 | if !subscriber.expiry.IsZero() && time.Now().After(subscriber.expiry) { |
| 203 | p.terminateSubscription(req.bucketID, subscriberID) |
| 204 | } |
| 205 | |
| 206 | } |
| 207 | p.Unlock() |
| 208 | continue |
| 209 | } |
| 210 | req.prevHash = currentHash |
| 211 | |
| 212 | p.Lock() |
| 213 | subscribers, ok := p.pollRegistry[req.bucketID] |
| 214 | if !ok || len(subscribers) == 0 { |
| 215 | // There is no subscribers to push the update. So, kill the current polling |
| 216 | // go routine. |
| 217 | delete(p.pollRegistry, req.bucketID) |
| 218 | p.Unlock() |
| 219 | return |
| 220 | } |
| 221 |
no test coverage detected