MCPcopy
hub / github.com/dgraph-io/dgraph / SubscribeForUpdates

Function SubscribeForUpdates

worker/groups.go:1119–1167  ·  view source on GitHub ↗

SubscribeForUpdates will listen for updates for the given group.

(prefixes [][]byte, ignore string, cb func(kvs *badgerpb.KVList),
	group uint32, closer *z.Closer)

Source from the content-addressed store, hash-verified

1117
1118// SubscribeForUpdates will listen for updates for the given group.
1119func SubscribeForUpdates(prefixes [][]byte, ignore string, cb func(kvs *badgerpb.KVList),
1120 group uint32, closer *z.Closer) {
1121
1122 var prefix []byte
1123 if len(prefixes) > 0 {
1124 prefix = prefixes[0]
1125 }
1126 defer func() {
1127 glog.Infof("SubscribeForUpdates closing for prefix: %q\n", prefix)
1128 closer.Done()
1129 }()
1130
1131 listen := func() error {
1132 // Connect to any of the group 1 nodes.
1133 members := groups().AnyTwoServers(group)
1134 // There may be a lag while starting so keep retrying.
1135 if len(members) == 0 {
1136 return fmt.Errorf("unable to find any servers for group: %d", group)
1137 }
1138 pool := conn.GetPools().Connect(members[0], x.WorkerConfig.TLSClientConfig)
1139 client := pb.NewWorkerClient(pool.Get())
1140
1141 // Get Subscriber stream.
1142 stream, err := client.Subscribe(closer.Ctx(),
1143 &pb.SubscriptionRequest{Matches: x.PrefixesToMatches(prefixes, ignore)})
1144 if err != nil {
1145 return errors.Wrapf(err, "error from client.subscribe")
1146 }
1147 for {
1148 // Listen for updates.
1149 kvs, err := stream.Recv()
1150 if err != nil {
1151 return errors.Wrapf(err, "while receiving from stream")
1152 }
1153 cb(kvs)
1154 }
1155 }
1156
1157 for {
1158 if err := listen(); err != nil {
1159 glog.Errorf("Error during SubscribeForUpdates for prefix %q: %v. closer err: %v\n",
1160 prefix, err, closer.Ctx().Err())
1161 }
1162 if closer.Ctx().Err() != nil {
1163 return
1164 }
1165 time.Sleep(time.Second)
1166 }
1167}

Callers 2

SubscribeForAclUpdatesFunction · 0.92
newAdminResolverFunction · 0.92

Calls 13

SubscribeMethod · 0.95
GetPoolsFunction · 0.92
NewWorkerClientFunction · 0.92
PrefixesToMatchesFunction · 0.92
groupsFunction · 0.85
InfofMethod · 0.80
AnyTwoServersMethod · 0.80
ConnectMethod · 0.65
GetMethod · 0.65
RecvMethod · 0.65
DoneMethod · 0.45
ErrorfMethod · 0.45

Tested by

no test coverage detected