MCPcopy
hub / github.com/alibaba/MongoShake / executeInParallel

Method executeInParallel

executor/executor.go:131–167  ·  view source on GitHub ↗
(logs []*OplogRecord)

Source from the content-addressed store, hash-verified

129}
130
131func (batchExecutor *BatchGroupExecutor) executeInParallel(logs []*OplogRecord) {
132 // prepare execution monitor
133 latch := new(sync.WaitGroup)
134 latch.Add(len(logs))
135 // shard oplogRecords by _id primary key and make up callback chain
136 var buffer = make([][]*OplogRecord, len(batchExecutor.executors))
137 shardKey := oplog.PrimaryKeyHasher{}
138 var completionList []func()
139 for _, log := range logs {
140 selected := shardKey.DistributeOplogByMod(log.original.partialLog, len(batchExecutor.executors))
141 buffer[selected] = append(buffer[selected], log)
142 if log.original.callback != nil {
143 // should be ordered by the incoming sequence
144 completionList = append(completionList, log.original.callback)
145 }
146 }
147 for index, buf := range buffer {
148 if len(buf) != 0 {
149 nimo.AssertTrue(len(batchExecutor.executors[index].batchBlock) == 0, "executors buffer is not empty!")
150 nimo.AssertTrue(batchExecutor.executors[index].finisher == nil, "executors await status is wrong!")
151 batchExecutor.executors[index].finisher = latch
152 // follow the MEMORY MODEL : finisher should be assigned
153 // before batchBlock channel. it read after <- batchBlock
154 batchExecutor.executors[index].batchBlock <- buf
155 }
156 }
157 // wait for execute completely
158 latch.Wait()
159 // invoke all callbacks
160 for _, callback := range completionList {
161 callback()
162 }
163 // sweep executors' block buffer and await
164 for _, exec := range batchExecutor.executors {
165 exec.finisher = nil
166 }
167}
168
169type Executor struct {
170 // sequence index id in each replayer

Callers 1

replayMethod · 0.95

Calls 2

DistributeOplogByModMethod · 0.95
WaitMethod · 0.80

Tested by

no test coverage detected