MCPcopy
hub / github.com/alibaba/MongoShake / BatchMore

Method BatchMore

collector/batcher.go:279–392  ·  view source on GitHub ↗

BatchMore ** * This function is used to gather oplogs together. * Honestly speaking, it's complicate so that reading unit tests may help you to make it more clear. * The reason this function is so complicated is that there are too many corner cases involved. * Return batched oplogs and barrier f

()

Source from the content-addressed store, hash-verified

277 * | |
278 */
279func (batcher *Batcher) BatchMore() (genericOplogs [][]*oplog.GenericOplog, barrier bool, allEmpty bool, exit bool) {
280 // picked raw oplogs and batching in sequence
281 batcher.batchGroup = make([][]*oplog.GenericOplog, len(batcher.workerGroup))
282 if batcher.barrierOplogs == nil {
283 batcher.barrierOplogs = make([]*oplog.GenericOplog, 0)
284 }
285
286 // Have barrier Oplogs to performed
287 if len(batcher.barrierOplogs) > 0 {
288 for _, v := range batcher.barrierOplogs {
289 if batcher.filter(v.Parsed) {
290 batcher.lastFilterOplog = v.Parsed
291 continue
292 }
293 if ddlFilter.Filter(v.Parsed) && !conf.Options.FilterDDLEnable {
294 batcher.lastFilterOplog = v.Parsed
295 continue
296 }
297
298 batcher.addIntoBatchGroup(v, true)
299 //l.Logger.Infof("%s transfer barrierOplogs into batchGroup, i[%d], oplog[%v]", batcher.syncer, i, v.Parsed)
300 }
301 batcher.barrierOplogs = nil
302
303 return batcher.batchGroup, true, batcher.setLastOplog(), false
304 }
305
306 // try to get batch
307 mergeBatch, exit := batcher.getBatchWithDelay()
308
309 if mergeBatch == nil {
310 return batcher.batchGroup, false, batcher.setLastOplog(), exit
311 }
312
313 for i, genericLog := range mergeBatch {
314 // filter oplog such like Noop or with gid
315 // PAY ATTENTION: we can't handle the oplog in transaction that has been filtered
316 if batcher.filter(genericLog.Parsed) {
317 // don't push to worker, set lastFilterOplog
318 batcher.lastFilterOplog = genericLog.Parsed
319 //l.Logger.Debugf("~~~~~~~~~filter %v %v", i, genericLog.Parsed)
320 continue
321 }
322
323 // Transaction
324 if txnMeta, txnOk := batcher.isTransaction(genericLog.Parsed); txnOk {
325 //l.Logger.Debugf("~~~~~~~~~transaction %v %v", i, genericLog.Parsed)
326 isRet, mustIndividual, deliveredOps := batcher.handleTransaction(txnMeta, genericLog)
327 if !isRet {
328 continue
329 }
330 if mustIndividual {
331 batcher.barrierOplogs = deliveredOps
332
333 batcher.remainLogs = mergeBatch[i+1:]
334
335 allEmpty := batcher.setLastOplog()
336 nimo.AssertTrue(allEmpty == true, "batcher.batchGroup don't be empty")

Callers 3

TestBatchMoreFunction · 0.95
startBatcherMethod · 0.80

Calls 11

filterMethod · 0.95
addIntoBatchGroupMethod · 0.95
setLastOplogMethod · 0.95
getBatchWithDelayMethod · 0.95
isTransactionMethod · 0.95
handleTransactionMethod · 0.95
ExtraCommandNameFunction · 0.92
ExtractInnerOpsFunction · 0.92
PanicfMethod · 0.80
AddFilterMethod · 0.80
FilterMethod · 0.65

Tested by 2

TestBatchMoreFunction · 0.76