MCPcopy
hub / github.com/dgraph-io/dgraph / applyCommitted

Method applyCommitted

worker/draft.go:617–829  ·  view source on GitHub ↗
(proposal *pb.Proposal, key uint64)

Source from the content-addressed store, hash-verified

615}
616
617func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error {
618 start := time.Now()
619 defer func() {
620 since := time.Since(start)
621 if since > 30*time.Second {
622 glog.Warningf("applyCh entry took [%v] to handle: %#v", since, proposal)
623 }
624 }()
625
626 ctx := n.Ctx(key)
627 span := trace.SpanFromContext(ctx)
628 span.AddEvent("Node.applyCommited", trace.WithAttributes(
629 attribute.Int64("node id", int64(n.Id)),
630 attribute.Int64("Group Id", int64(n.gid)),
631 attribute.Int64("proposal key", int64(key))))
632
633 if proposal.Mutations != nil {
634 // syncmarks for this shouldn't be marked done until it's committed.
635 span.AddEvent("Applying mutations")
636 if err := n.applyMutations(ctx, proposal); err != nil {
637 span.AddEvent("While applying mutations", trace.WithAttributes(
638 attribute.String("error", err.Error())))
639 return err
640 }
641
642 span.AddEvent("Done")
643 return nil
644 }
645
646 switch {
647 case len(proposal.Kv) > 0:
648 return populateKeyValues(ctx, proposal.Kv)
649
650 case proposal.State != nil:
651 span.AddEvent("Applying state for key: %s", trace.WithAttributes(
652 attribute.Int64("key", int64(key))))
653 // This state needn't be snapshotted in this group, on restart we would fetch
654 // a state which is latest or equal to this.
655 groups().applyState(groups().Node.Id, proposal.State)
656 return nil
657
658 case len(proposal.CleanPredicate) > 0:
659 span.AddEvent("Cleaning predicate: %s", trace.WithAttributes(
660 attribute.String("predicate", proposal.CleanPredicate)))
661 end := time.Now().Add(10 * time.Second)
662 for proposal.ExpectedChecksum > 0 && time.Now().Before(end) {
663 cur := atomic.LoadUint64(&groups().membershipChecksum)
664 if proposal.ExpectedChecksum == cur {
665 break
666 }
667 time.Sleep(100 * time.Millisecond)
668 glog.Infof("Waiting for checksums to match. Expected: %d. Current: %d\n",
669 proposal.ExpectedChecksum, cur)
670 }
671 if time.Now().After(end) {
672 glog.Warningf(
673 "Giving up on predicate deletion: %q due to timeout. Wanted checksum: %d.",
674 proposal.CleanPredicate, proposal.ExpectedChecksum)

Callers 1

processApplyChMethod · 0.95

Calls 15

CtxMethod · 0.95
applyMutationsMethod · 0.95
commitOrAbortMethod · 0.95
populateSnapshotMethod · 0.95
startTaskMethod · 0.95
DeletePredicateFunction · 0.92
DeleteAllFunction · 0.92
CheckFunction · 0.92
UpdateDrainingModeFunction · 0.92
AssertTrueFunction · 0.92

Tested by

no test coverage detected