(logs []*PartialLogWithCallback)
| 93 | } |
| 94 | |
| 95 | func (batchExecutor *BatchGroupExecutor) replay(logs []*PartialLogWithCallback) { |
| 96 | // TODO: skip the oplogRecords which has been replayed |
| 97 | // lastTs := utils.TimestampToInt64(logs[len(logs)-1].partialLog.Timestamp) |
| 98 | // if batchExecutor.replayer.Ack >= lastTs { |
| 99 | // // every oplog in buffer have been already executed in previously |
| 100 | // // so discard them simply. Even the smaller timestamp oplogRecords has |
| 101 | // // been changed(other collector or other mongos source) |
| 102 | // return |
| 103 | // } |
| 104 | |
| 105 | // executor needs to check pausing or throttle here. |
| 106 | batchExecutor.replicateShouldStall(int64(len(logs))) |
| 107 | |
| 108 | // In mongo shard cluster. our request goes into mongos. it's safe for |
| 109 | // unique index without collision detection |
| 110 | var matrix CollisionMatrix = &NoopMatrix{} |
| 111 | if conf.Options.IncrSyncCollisionEnable { |
| 112 | matrix = NewBarrierMatrix() |
| 113 | } |
| 114 | |
| 115 | // firstly. we split the oplogRecords into segments which are the unit |
| 116 | // of safety execution. it means there is no any operations |
| 117 | // on the safe unique index in the single segment. |
| 118 | var segments = matrix.split(logs) |
| 119 | // secondly. in each segment, we analyze the dependence between |
| 120 | // each oplogRecords. And |
| 121 | for _, segment := range segments { |
| 122 | toBeExecuted := matrix.convert(segment) |
| 123 | batchExecutor.executeInParallel(toBeExecuted) |
| 124 | } |
| 125 | } |
| 126 | |
| 127 | // TODO |
| 128 | func (batchExecutor *BatchGroupExecutor) replicateShouldStall(n int64) { |
no test coverage detected