MCPcopy
hub / github.com/dgraph-io/dgraph / runSchemaMutation

Function runSchemaMutation

worker/mutation.go:133–279  ·  view source on GitHub ↗
(ctx context.Context, updates []*pb.SchemaUpdate, startTs uint64)

Source from the content-addressed store, hash-verified

131}
132
133func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs uint64) error {
134 if len(updates) == 0 {
135 return nil
136 }
137 // Wait until schema modification for all predicates is complete. There cannot be two
138 // background tasks running as this is a race condition. We typically won't propose an
139 // index update if one is already going on. If that's not the case, then the receiver
140 // of the update had probably finished the previous index update but some follower
141 // (or perhaps leader) had not finished it.
142 // In other words, the proposer checks whether there is another indexing in progress.
143 // If that's the case, the alter request is rejected. Otherwise, the request is accepted.
144 // Before reaching here, the proposer P would have checked that no indexing is in progress
145 // (could also be because proposer was done earlier than others). If P was still indexing
146 // when the req was received, it would have rejected the Alter request. Only if P is
147 // not indexing, it would accept and propose the request.
148 // It is possible that a receiver R of the proposal is still indexing. In that case, R would
149 // block here and wait for indexing to be finished.
150 gr.Node.waitForTask(opIndexing)
151
152 // done is used to ensure that we only stop the indexing task once.
153 var done uint32
154 start := time.Now()
155 stopIndexing := func(closer *z.Closer) {
156 // runSchemaMutation can return. stopIndexing could be called by goroutines.
157 if !schema.State().IndexingInProgress() {
158 if atomic.CompareAndSwapUint32(&done, 0, 1) {
159 closer.Done()
160 // Time check is here so that we do not propose snapshot too frequently.
161 if time.Since(start) < 10*time.Second || !gr.Node.AmLeader() {
162 return
163 }
164 if err := gr.Node.proposeSnapshot(); err != nil {
165 glog.Errorf("error in proposing snapshot: %v", err)
166 }
167 }
168 }
169 }
170
171 buildIndexesHelper := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) error {
172 wrtCtx := schema.GetWriteContext(context.Background())
173 if err := rebuild.BuildIndexes(wrtCtx); err != nil {
174 return err
175 }
176 if err := updateSchema(update, rebuild.StartTs); err != nil {
177 return err
178 }
179
180 glog.Infof("Done schema update %+v\n", update)
181 return nil
182 }
183
184 // This wg allows waiting until setup for all the predicates is complete
185 // before running buildIndexes for any of those predicates.
186 var wg sync.WaitGroup
187 wg.Add(1)
188 defer wg.Done()
189
190 // Badger opens around 8 files for indexing per predicate.

Callers 1

applyMutationsMethod · 0.85

Calls 15

BuildIndexesMethod · 0.95
NeedIndexRebuildMethod · 0.95
GetQuerySchemaMethod · 0.95
DropIndexesMethod · 0.95
BuildDataMethod · 0.95
StateFunction · 0.92
GetWriteContextFunction · 0.92
CheckFunction · 0.92
undoSchemaUpdateFunction · 0.85
groupsFunction · 0.85
waitForTaskMethod · 0.80
IndexingInProgressMethod · 0.80

Tested by

no test coverage detected