MCPcopy
hub / github.com/dgraph-io/dgraph / AddSubscriber

Method AddSubscriber

graphql/subscription/poller.go:58–154  ·  view source on GitHub ↗

AddSubscriber tries to add subscription into the existing polling goroutine if it exists. If it doesn't exist, then it creates a new polling goroutine for the given request.

(req *schema.Request)

Source from the content-addressed store, hash-verified

56// AddSubscriber tries to add subscription into the existing polling goroutine if it exists.
57// If it doesn't exist, then it creates a new polling goroutine for the given request.
58func (p *Poller) AddSubscriber(req *schema.Request) (*SubscriberResponse, error) {
59 p.RLock()
60 resolver := p.resolver
61 p.RUnlock()
62
63 localEpoch := atomic.LoadUint64(p.globalEpoch)
64 if err := resolver.ValidateSubscription(req); err != nil {
65 return nil, err
66 }
67
68 // find out the custom claims for auth, if any. As,
69 // We also need to use authVariables in generating the hashed bucketID
70 authMeta := resolver.Schema().Meta().AuthMeta()
71 ctx, err := authMeta.AttachAuthorizationJwt(context.Background(), req.Header)
72 if err != nil {
73 return nil, err
74 }
75 customClaims, err := authMeta.ExtractCustomClaims(ctx)
76 if err != nil {
77 return nil, err
78 }
79 // for the cases when no expiry is given in jwt or subscription doesn't have any authorization,
80 // we set their expiry to zero time
81 if customClaims.RegisteredClaims.ExpiresAt == nil {
82 customClaims.RegisteredClaims.ExpiresAt = jwt.NewNumericDate(time.Time{})
83 }
84
85 buf, err := json.Marshal(req)
86 x.Check(err)
87 var bucketID uint64
88 if customClaims.AuthVariables != nil {
89
90 // TODO - Add custom marshal function that marshal's the json in sorted order.
91 authvariables, err := json.Marshal(customClaims.AuthVariables)
92 if err != nil {
93 return nil, err
94 }
95 bucketID = farm.Fingerprint64(append(buf, authvariables...))
96 } else {
97 bucketID = farm.Fingerprint64(buf)
98 }
99 p.Lock()
100 defer p.Unlock()
101
102 res := resolver.Resolve(x.AttachAccessJwt(context.Background(),
103 &http.Request{Header: req.Header}), req)
104 if len(res.Errors) != 0 {
105 return nil, res.Errors
106 }
107
108 prevHash := farm.Fingerprint64(res.Data.Bytes())
109
110 updateCh := make(chan interface{}, 10)
111 updateCh <- res.Output()
112
113 subscriptionID := p.subscriptionID
114 // Increment ID for next subscription.
115 p.subscriptionID++

Callers 1

SubscribeMethod · 0.80

Calls 15

pollMethod · 0.95
CheckFunction · 0.92
AttachAccessJwtFunction · 0.92
RLockMethod · 0.80
RUnlockMethod · 0.80
ValidateSubscriptionMethod · 0.80
AuthMetaMethod · 0.80
ExtractCustomClaimsMethod · 0.80
OutputMethod · 0.80
InfofMethod · 0.80
MetaMethod · 0.65

Tested by

no test coverage detected