AddSubscriber tries to add subscription into the existing polling goroutine if it exists. If it doesn't exist, then it creates a new polling goroutine for the given request.
(req *schema.Request)
| 56 | // AddSubscriber tries to add subscription into the existing polling goroutine if it exists. |
| 57 | // If it doesn't exist, then it creates a new polling goroutine for the given request. |
| 58 | func (p *Poller) AddSubscriber(req *schema.Request) (*SubscriberResponse, error) { |
| 59 | p.RLock() |
| 60 | resolver := p.resolver |
| 61 | p.RUnlock() |
| 62 | |
| 63 | localEpoch := atomic.LoadUint64(p.globalEpoch) |
| 64 | if err := resolver.ValidateSubscription(req); err != nil { |
| 65 | return nil, err |
| 66 | } |
| 67 | |
| 68 | // find out the custom claims for auth, if any. As, |
| 69 | // We also need to use authVariables in generating the hashed bucketID |
| 70 | authMeta := resolver.Schema().Meta().AuthMeta() |
| 71 | ctx, err := authMeta.AttachAuthorizationJwt(context.Background(), req.Header) |
| 72 | if err != nil { |
| 73 | return nil, err |
| 74 | } |
| 75 | customClaims, err := authMeta.ExtractCustomClaims(ctx) |
| 76 | if err != nil { |
| 77 | return nil, err |
| 78 | } |
| 79 | // for the cases when no expiry is given in jwt or subscription doesn't have any authorization, |
| 80 | // we set their expiry to zero time |
| 81 | if customClaims.RegisteredClaims.ExpiresAt == nil { |
| 82 | customClaims.RegisteredClaims.ExpiresAt = jwt.NewNumericDate(time.Time{}) |
| 83 | } |
| 84 | |
| 85 | buf, err := json.Marshal(req) |
| 86 | x.Check(err) |
| 87 | var bucketID uint64 |
| 88 | if customClaims.AuthVariables != nil { |
| 89 | |
| 90 | // TODO - Add custom marshal function that marshal's the json in sorted order. |
| 91 | authvariables, err := json.Marshal(customClaims.AuthVariables) |
| 92 | if err != nil { |
| 93 | return nil, err |
| 94 | } |
| 95 | bucketID = farm.Fingerprint64(append(buf, authvariables...)) |
| 96 | } else { |
| 97 | bucketID = farm.Fingerprint64(buf) |
| 98 | } |
| 99 | p.Lock() |
| 100 | defer p.Unlock() |
| 101 | |
| 102 | res := resolver.Resolve(x.AttachAccessJwt(context.Background(), |
| 103 | &http.Request{Header: req.Header}), req) |
| 104 | if len(res.Errors) != 0 { |
| 105 | return nil, res.Errors |
| 106 | } |
| 107 | |
| 108 | prevHash := farm.Fingerprint64(res.Data.Bytes()) |
| 109 | |
| 110 | updateCh := make(chan interface{}, 10) |
| 111 | updateCh <- res.Output() |
| 112 | |
| 113 | subscriptionID := p.subscriptionID |
| 114 | // Increment ID for next subscription. |
| 115 | p.subscriptionID++ |
no test coverage detected