We don't support schema mutations across nodes in a transaction. Wait for all transactions to either abort or complete and all write transactions involving the predicate are aborted until schema mutations are done.
(ctx context.Context, proposal *pb.Proposal)
| 332 | // Wait for all transactions to either abort or complete and all write transactions |
| 333 | // involving the predicate are aborted until schema mutations are done. |
| 334 | func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr error) { |
| 335 | span := trace.SpanFromContext(ctx) |
| 336 | |
| 337 | if proposal.Mutations.DropOp == pb.Mutations_ALL_IN_NS { |
| 338 | ns, err := strconv.ParseUint(proposal.Mutations.DropValue, 0, 64) |
| 339 | if err != nil { |
| 340 | return err |
| 341 | } |
| 342 | // Ensures nothing get written to disk due to commit proposals. |
| 343 | posting.Oracle().ResetTxnsForNs(ns) |
| 344 | if err := posting.DeleteAllForNs(ns); err != nil { |
| 345 | return err |
| 346 | } |
| 347 | |
| 348 | // TODO: What about multi shard cluster? |
| 349 | // It should be okay to set the schema at timestamp 1 after drop all operation. |
| 350 | if groups().groupId() == 1 { |
| 351 | initialSchema := schema.InitialSchema(ns) |
| 352 | for _, s := range initialSchema { |
| 353 | if err := applySchema(s, 1); err != nil { |
| 354 | return err |
| 355 | } |
| 356 | } |
| 357 | } |
| 358 | |
| 359 | // Propose initial types as well after a drop all as they would have been cleared. |
| 360 | initialTypes := schema.InitialTypes(ns) |
| 361 | for _, t := range initialTypes { |
| 362 | if err := updateType(t.GetTypeName(), t, 1); err != nil { |
| 363 | return err |
| 364 | } |
| 365 | } |
| 366 | |
| 367 | // TODO: Revisit this when we work on posting cache. Don't clear entire cache. |
| 368 | // We don't want to drop entire cache, just due to one namespace. |
| 369 | posting.ResetCache() |
| 370 | return nil |
| 371 | } |
| 372 | |
| 373 | if proposal.Mutations.DropOp == pb.Mutations_DATA { |
| 374 | ns, err := strconv.ParseUint(proposal.Mutations.DropValue, 0, 64) |
| 375 | if err != nil { |
| 376 | return err |
| 377 | } |
| 378 | // Ensures nothing get written to disk due to commit proposals. |
| 379 | posting.Oracle().ResetTxnsForNs(ns) |
| 380 | if err := posting.DeleteData(ns); err != nil { |
| 381 | return err |
| 382 | } |
| 383 | |
| 384 | // TODO: Revisit this when we work on posting cache. Don't clear entire cache. |
| 385 | // We don't want to drop entire cache, just due to one namespace. |
| 386 | posting.ResetCache() |
| 387 | return nil |
| 388 | } |
| 389 | |
| 390 | if proposal.Mutations.DropOp == pb.Mutations_ALL { |
| 391 | // Ensures nothing get written to disk due to commit proposals. |
no test coverage detected