BatchMore ** * This function is used to gather oplogs together. * Honestly speaking, it's complicate so that reading unit tests may help you to make it more clear. * The reason this function is so complicated is that there are too many corner cases involved. * Return batched oplogs and barrier f
()
| 277 | * | | |
| 278 | */ |
| 279 | func (batcher *Batcher) BatchMore() (genericOplogs [][]*oplog.GenericOplog, barrier bool, allEmpty bool, exit bool) { |
| 280 | // picked raw oplogs and batching in sequence |
| 281 | batcher.batchGroup = make([][]*oplog.GenericOplog, len(batcher.workerGroup)) |
| 282 | if batcher.barrierOplogs == nil { |
| 283 | batcher.barrierOplogs = make([]*oplog.GenericOplog, 0) |
| 284 | } |
| 285 | |
| 286 | // Have barrier Oplogs to performed |
| 287 | if len(batcher.barrierOplogs) > 0 { |
| 288 | for _, v := range batcher.barrierOplogs { |
| 289 | if batcher.filter(v.Parsed) { |
| 290 | batcher.lastFilterOplog = v.Parsed |
| 291 | continue |
| 292 | } |
| 293 | if ddlFilter.Filter(v.Parsed) && !conf.Options.FilterDDLEnable { |
| 294 | batcher.lastFilterOplog = v.Parsed |
| 295 | continue |
| 296 | } |
| 297 | |
| 298 | batcher.addIntoBatchGroup(v, true) |
| 299 | //l.Logger.Infof("%s transfer barrierOplogs into batchGroup, i[%d], oplog[%v]", batcher.syncer, i, v.Parsed) |
| 300 | } |
| 301 | batcher.barrierOplogs = nil |
| 302 | |
| 303 | return batcher.batchGroup, true, batcher.setLastOplog(), false |
| 304 | } |
| 305 | |
| 306 | // try to get batch |
| 307 | mergeBatch, exit := batcher.getBatchWithDelay() |
| 308 | |
| 309 | if mergeBatch == nil { |
| 310 | return batcher.batchGroup, false, batcher.setLastOplog(), exit |
| 311 | } |
| 312 | |
| 313 | for i, genericLog := range mergeBatch { |
| 314 | // filter oplog such like Noop or with gid |
| 315 | // PAY ATTENTION: we can't handle the oplog in transaction that has been filtered |
| 316 | if batcher.filter(genericLog.Parsed) { |
| 317 | // don't push to worker, set lastFilterOplog |
| 318 | batcher.lastFilterOplog = genericLog.Parsed |
| 319 | //l.Logger.Debugf("~~~~~~~~~filter %v %v", i, genericLog.Parsed) |
| 320 | continue |
| 321 | } |
| 322 | |
| 323 | // Transaction |
| 324 | if txnMeta, txnOk := batcher.isTransaction(genericLog.Parsed); txnOk { |
| 325 | //l.Logger.Debugf("~~~~~~~~~transaction %v %v", i, genericLog.Parsed) |
| 326 | isRet, mustIndividual, deliveredOps := batcher.handleTransaction(txnMeta, genericLog) |
| 327 | if !isRet { |
| 328 | continue |
| 329 | } |
| 330 | if mustIndividual { |
| 331 | batcher.barrierOplogs = deliveredOps |
| 332 | |
| 333 | batcher.remainLogs = mergeBatch[i+1:] |
| 334 | |
| 335 | allEmpty := batcher.setLastOplog() |
| 336 | nimo.AssertTrue(allEmpty == true, "batcher.batchGroup don't be empty") |