(ctx context.Context, updates []*pb.SchemaUpdate, startTs uint64)
| 131 | } |
| 132 | |
| 133 | func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs uint64) error { |
| 134 | if len(updates) == 0 { |
| 135 | return nil |
| 136 | } |
| 137 | // Wait until schema modification for all predicates is complete. There cannot be two |
| 138 | // background tasks running as this is a race condition. We typically won't propose an |
| 139 | // index update if one is already going on. If that's not the case, then the receiver |
| 140 | // of the update had probably finished the previous index update but some follower |
| 141 | // (or perhaps leader) had not finished it. |
| 142 | // In other words, the proposer checks whether there is another indexing in progress. |
| 143 | // If that's the case, the alter request is rejected. Otherwise, the request is accepted. |
| 144 | // Before reaching here, the proposer P would have checked that no indexing is in progress |
| 145 | // (could also be because proposer was done earlier than others). If P was still indexing |
| 146 | // when the req was received, it would have rejected the Alter request. Only if P is |
| 147 | // not indexing, it would accept and propose the request. |
| 148 | // It is possible that a receiver R of the proposal is still indexing. In that case, R would |
| 149 | // block here and wait for indexing to be finished. |
| 150 | gr.Node.waitForTask(opIndexing) |
| 151 | |
| 152 | // done is used to ensure that we only stop the indexing task once. |
| 153 | var done uint32 |
| 154 | start := time.Now() |
| 155 | stopIndexing := func(closer *z.Closer) { |
| 156 | // runSchemaMutation can return. stopIndexing could be called by goroutines. |
| 157 | if !schema.State().IndexingInProgress() { |
| 158 | if atomic.CompareAndSwapUint32(&done, 0, 1) { |
| 159 | closer.Done() |
| 160 | // Time check is here so that we do not propose snapshot too frequently. |
| 161 | if time.Since(start) < 10*time.Second || !gr.Node.AmLeader() { |
| 162 | return |
| 163 | } |
| 164 | if err := gr.Node.proposeSnapshot(); err != nil { |
| 165 | glog.Errorf("error in proposing snapshot: %v", err) |
| 166 | } |
| 167 | } |
| 168 | } |
| 169 | } |
| 170 | |
| 171 | buildIndexesHelper := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) error { |
| 172 | wrtCtx := schema.GetWriteContext(context.Background()) |
| 173 | if err := rebuild.BuildIndexes(wrtCtx); err != nil { |
| 174 | return err |
| 175 | } |
| 176 | if err := updateSchema(update, rebuild.StartTs); err != nil { |
| 177 | return err |
| 178 | } |
| 179 | |
| 180 | glog.Infof("Done schema update %+v\n", update) |
| 181 | return nil |
| 182 | } |
| 183 | |
| 184 | // This wg allows waiting until setup for all the predicates is complete |
| 185 | // before running buildIndexes for any of those predicates. |
| 186 | var wg sync.WaitGroup |
| 187 | wg.Add(1) |
| 188 | defer wg.Done() |
| 189 | |
| 190 | // Badger opens around 8 files for indexing per predicate. |
no test coverage detected