MCPcopy
hub / github.com/alibaba/MongoShake / replay

Method replay

executor/executor.go:95–125  ·  view source on GitHub ↗
(logs []*PartialLogWithCallback)

Source from the content-addressed store, hash-verified

93}
94
95func (batchExecutor *BatchGroupExecutor) replay(logs []*PartialLogWithCallback) {
96 // TODO: skip the oplogRecords which has been replayed
97 // lastTs := utils.TimestampToInt64(logs[len(logs)-1].partialLog.Timestamp)
98 // if batchExecutor.replayer.Ack >= lastTs {
99 // // every oplog in buffer have been already executed in previously
100 // // so discard them simply. Even the smaller timestamp oplogRecords has
101 // // been changed(other collector or other mongos source)
102 // return
103 // }
104
105 // executor needs to check pausing or throttle here.
106 batchExecutor.replicateShouldStall(int64(len(logs)))
107
108 // In mongo shard cluster. our request goes into mongos. it's safe for
109 // unique index without collision detection
110 var matrix CollisionMatrix = &NoopMatrix{}
111 if conf.Options.IncrSyncCollisionEnable {
112 matrix = NewBarrierMatrix()
113 }
114
115 // firstly. we split the oplogRecords into segments which are the unit
116 // of safety execution. it means there is no any operations
117 // on the safe unique index in the single segment.
118 var segments = matrix.split(logs)
119 // secondly. in each segment, we analyze the dependence between
120 // each oplogRecords. And
121 for _, segment := range segments {
122 toBeExecuted := matrix.convert(segment)
123 batchExecutor.executeInParallel(toBeExecuted)
124 }
125}
126
127// TODO
128func (batchExecutor *BatchGroupExecutor) replicateShouldStall(n int64) {

Callers 1

SyncMethod · 0.95

Calls 5

replicateShouldStallMethod · 0.95
splitMethod · 0.95
convertMethod · 0.95
executeInParallelMethod · 0.95
NewBarrierMatrixFunction · 0.85

Tested by

no test coverage detected