sendDeltasToSubscribers reads updates from the o.updates constructs a delta object containing transactions from one or more updates and sends the delta object to each subscriber's channel
()
| 183 | // constructs a delta object containing transactions from one or more updates |
| 184 | // and sends the delta object to each subscriber's channel |
| 185 | func (o *Oracle) sendDeltasToSubscribers() { |
| 186 | delta := &pb.OracleDelta{} |
| 187 | ticker := time.Tick(time.Second) |
| 188 | |
| 189 | // waitFor calculates the maximum value of delta.MaxAssigned and all the CommitTs of delta.Txns |
| 190 | waitFor := func() uint64 { |
| 191 | w := delta.MaxAssigned |
| 192 | for _, txn := range delta.Txns { |
| 193 | w = x.Max(w, txn.CommitTs) |
| 194 | } |
| 195 | return w |
| 196 | } |
| 197 | |
| 198 | for { |
| 199 | get_update: |
| 200 | var update *pb.OracleDelta |
| 201 | select { |
| 202 | case update = <-o.updates: |
| 203 | case <-ticker: |
| 204 | wait := waitFor() |
| 205 | if wait == 0 || o.doneUntil.DoneUntil() < wait { |
| 206 | goto get_update |
| 207 | } |
| 208 | // Send empty update. |
| 209 | update = &pb.OracleDelta{} |
| 210 | } |
| 211 | slurp_loop: |
| 212 | for { |
| 213 | delta.MaxAssigned = x.Max(delta.MaxAssigned, update.MaxAssigned) |
| 214 | delta.Txns = append(delta.Txns, update.Txns...) |
| 215 | select { |
| 216 | case update = <-o.updates: |
| 217 | default: |
| 218 | break slurp_loop |
| 219 | } |
| 220 | } |
| 221 | // No need to sort the txn updates here. Alpha would sort them before |
| 222 | // applying. |
| 223 | |
| 224 | // Let's ensure that we have all the commits up until the max here. |
| 225 | // Otherwise, we'll be sending commit timestamps out of order, which |
| 226 | // would cause Alphas to drop some of them, during writes to Badger. |
| 227 | if o.doneUntil.DoneUntil() < waitFor() { |
| 228 | continue // The for loop doing blocking reads from o.updates. |
| 229 | // We need at least one entry from the updates channel to pick up a missing update. |
| 230 | // Don't goto slurp_loop, because it would break from select immediately. |
| 231 | } |
| 232 | |
| 233 | if glog.V(3) { |
| 234 | glog.Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta) |
| 235 | } |
| 236 | o.Lock() |
| 237 | for id, ch := range o.subscribers { |
| 238 | select { |
| 239 | case ch <- *delta: |
| 240 | default: |
| 241 | close(ch) |
| 242 | delete(o.subscribers, id) |