MCPcopy
hub / github.com/dgraph-io/dgraph / SubscribeForAclUpdates

Function SubscribeForAclUpdates

edgraph/access.go:352–390  ·  view source on GitHub ↗

SubscribeForAclUpdates subscribes for ACL predicates and updates the acl cache.

(closer *z.Closer)

Source from the content-addressed store, hash-verified

350
351// SubscribeForAclUpdates subscribes for ACL predicates and updates the acl cache.
352func SubscribeForAclUpdates(closer *z.Closer) {
353 defer func() {
354 glog.Infoln("RefreshAcls closed")
355 closer.Done()
356 }()
357 if worker.Config.AclSecretKey == nil {
358 // the acl feature is not turned on
359 return
360 }
361
362 var maxRefreshTs uint64
363 retrieveAcls := func(ns uint64, refreshTs uint64) error {
364 if refreshTs <= maxRefreshTs {
365 return nil
366 }
367 maxRefreshTs = refreshTs
368 return refreshAclCache(closer.Ctx(), ns, refreshTs)
369 }
370
371 closer.AddRunning(1)
372 go worker.SubscribeForUpdates(aclPrefixes, x.IgnoreBytes, func(kvs *bpb.KVList) {
373 if kvs == nil || len(kvs.Kv) == 0 {
374 return
375 }
376 kv := x.KvWithMaxVersion(kvs, aclPrefixes)
377 pk, err := x.Parse(kv.GetKey())
378 if err != nil {
379 glog.Fatalf("Got a key from subscription which is not parsable: %s", err)
380 }
381 glog.V(3).Infof("Got ACL update via subscription for attr: %s", pk.Attr)
382
383 ns, _ := x.ParseNamespaceAttr(pk.Attr)
384 if err := retrieveAcls(ns, kv.GetVersion()); err != nil {
385 glog.Errorf("Error while retrieving acls: %v", err)
386 }
387 }, 1, closer)
388
389 <-closer.HasBeenClosed()
390}
391
392const queryAcls = `
393{

Callers 1

runFunction · 0.92

Calls 12

SubscribeForUpdatesFunction · 0.92
KvWithMaxVersionFunction · 0.92
ParseFunction · 0.92
ParseNamespaceAttrFunction · 0.92
refreshAclCacheFunction · 0.85
FatalfMethod · 0.80
InfofMethod · 0.80
GetVersionMethod · 0.65
DoneMethod · 0.45
CtxMethod · 0.45
GetKeyMethod · 0.45
ErrorfMethod · 0.45

Tested by

no test coverage detected