SubscribeForAclUpdates subscribes for ACL predicates and updates the acl cache.
(closer *z.Closer)
| 350 | |
| 351 | // SubscribeForAclUpdates subscribes for ACL predicates and updates the acl cache. |
| 352 | func SubscribeForAclUpdates(closer *z.Closer) { |
| 353 | defer func() { |
| 354 | glog.Infoln("RefreshAcls closed") |
| 355 | closer.Done() |
| 356 | }() |
| 357 | if worker.Config.AclSecretKey == nil { |
| 358 | // the acl feature is not turned on |
| 359 | return |
| 360 | } |
| 361 | |
| 362 | var maxRefreshTs uint64 |
| 363 | retrieveAcls := func(ns uint64, refreshTs uint64) error { |
| 364 | if refreshTs <= maxRefreshTs { |
| 365 | return nil |
| 366 | } |
| 367 | maxRefreshTs = refreshTs |
| 368 | return refreshAclCache(closer.Ctx(), ns, refreshTs) |
| 369 | } |
| 370 | |
| 371 | closer.AddRunning(1) |
| 372 | go worker.SubscribeForUpdates(aclPrefixes, x.IgnoreBytes, func(kvs *bpb.KVList) { |
| 373 | if kvs == nil || len(kvs.Kv) == 0 { |
| 374 | return |
| 375 | } |
| 376 | kv := x.KvWithMaxVersion(kvs, aclPrefixes) |
| 377 | pk, err := x.Parse(kv.GetKey()) |
| 378 | if err != nil { |
| 379 | glog.Fatalf("Got a key from subscription which is not parsable: %s", err) |
| 380 | } |
| 381 | glog.V(3).Infof("Got ACL update via subscription for attr: %s", pk.Attr) |
| 382 | |
| 383 | ns, _ := x.ParseNamespaceAttr(pk.Attr) |
| 384 | if err := retrieveAcls(ns, kv.GetVersion()); err != nil { |
| 385 | glog.Errorf("Error while retrieving acls: %v", err) |
| 386 | } |
| 387 | }, 1, closer) |
| 388 | |
| 389 | <-closer.HasBeenClosed() |
| 390 | } |
| 391 | |
| 392 | const queryAcls = ` |
| 393 | { |
no test coverage detected