SubscribeForUpdates will listen for updates for the given group.
(prefixes [][]byte, ignore string, cb func(kvs *badgerpb.KVList), group uint32, closer *z.Closer)
| 1117 | |
| 1118 | // SubscribeForUpdates will listen for updates for the given group. |
| 1119 | func SubscribeForUpdates(prefixes [][]byte, ignore string, cb func(kvs *badgerpb.KVList), |
| 1120 | group uint32, closer *z.Closer) { |
| 1121 | |
| 1122 | var prefix []byte |
| 1123 | if len(prefixes) > 0 { |
| 1124 | prefix = prefixes[0] |
| 1125 | } |
| 1126 | defer func() { |
| 1127 | glog.Infof("SubscribeForUpdates closing for prefix: %q\n", prefix) |
| 1128 | closer.Done() |
| 1129 | }() |
| 1130 | |
| 1131 | listen := func() error { |
| 1132 | // Connect to any of the group 1 nodes. |
| 1133 | members := groups().AnyTwoServers(group) |
| 1134 | // There may be a lag while starting so keep retrying. |
| 1135 | if len(members) == 0 { |
| 1136 | return fmt.Errorf("unable to find any servers for group: %d", group) |
| 1137 | } |
| 1138 | pool := conn.GetPools().Connect(members[0], x.WorkerConfig.TLSClientConfig) |
| 1139 | client := pb.NewWorkerClient(pool.Get()) |
| 1140 | |
| 1141 | // Get Subscriber stream. |
| 1142 | stream, err := client.Subscribe(closer.Ctx(), |
| 1143 | &pb.SubscriptionRequest{Matches: x.PrefixesToMatches(prefixes, ignore)}) |
| 1144 | if err != nil { |
| 1145 | return errors.Wrapf(err, "error from client.subscribe") |
| 1146 | } |
| 1147 | for { |
| 1148 | // Listen for updates. |
| 1149 | kvs, err := stream.Recv() |
| 1150 | if err != nil { |
| 1151 | return errors.Wrapf(err, "while receiving from stream") |
| 1152 | } |
| 1153 | cb(kvs) |
| 1154 | } |
| 1155 | } |
| 1156 | |
| 1157 | for { |
| 1158 | if err := listen(); err != nil { |
| 1159 | glog.Errorf("Error during SubscribeForUpdates for prefix %q: %v. closer err: %v\n", |
| 1160 | prefix, err, closer.Ctx().Err()) |
| 1161 | } |
| 1162 | if closer.Ctx().Err() != nil { |
| 1163 | return |
| 1164 | } |
| 1165 | time.Sleep(time.Second) |
| 1166 | } |
| 1167 | } |
no test coverage detected