(proposal *pb.Proposal, key uint64)
| 615 | } |
| 616 | |
| 617 | func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error { |
| 618 | start := time.Now() |
| 619 | defer func() { |
| 620 | since := time.Since(start) |
| 621 | if since > 30*time.Second { |
| 622 | glog.Warningf("applyCh entry took [%v] to handle: %#v", since, proposal) |
| 623 | } |
| 624 | }() |
| 625 | |
| 626 | ctx := n.Ctx(key) |
| 627 | span := trace.SpanFromContext(ctx) |
| 628 | span.AddEvent("Node.applyCommited", trace.WithAttributes( |
| 629 | attribute.Int64("node id", int64(n.Id)), |
| 630 | attribute.Int64("Group Id", int64(n.gid)), |
| 631 | attribute.Int64("proposal key", int64(key)))) |
| 632 | |
| 633 | if proposal.Mutations != nil { |
| 634 | // syncmarks for this shouldn't be marked done until it's committed. |
| 635 | span.AddEvent("Applying mutations") |
| 636 | if err := n.applyMutations(ctx, proposal); err != nil { |
| 637 | span.AddEvent("While applying mutations", trace.WithAttributes( |
| 638 | attribute.String("error", err.Error()))) |
| 639 | return err |
| 640 | } |
| 641 | |
| 642 | span.AddEvent("Done") |
| 643 | return nil |
| 644 | } |
| 645 | |
| 646 | switch { |
| 647 | case len(proposal.Kv) > 0: |
| 648 | return populateKeyValues(ctx, proposal.Kv) |
| 649 | |
| 650 | case proposal.State != nil: |
| 651 | span.AddEvent("Applying state for key: %s", trace.WithAttributes( |
| 652 | attribute.Int64("key", int64(key)))) |
| 653 | // This state needn't be snapshotted in this group, on restart we would fetch |
| 654 | // a state which is latest or equal to this. |
| 655 | groups().applyState(groups().Node.Id, proposal.State) |
| 656 | return nil |
| 657 | |
| 658 | case len(proposal.CleanPredicate) > 0: |
| 659 | span.AddEvent("Cleaning predicate: %s", trace.WithAttributes( |
| 660 | attribute.String("predicate", proposal.CleanPredicate))) |
| 661 | end := time.Now().Add(10 * time.Second) |
| 662 | for proposal.ExpectedChecksum > 0 && time.Now().Before(end) { |
| 663 | cur := atomic.LoadUint64(&groups().membershipChecksum) |
| 664 | if proposal.ExpectedChecksum == cur { |
| 665 | break |
| 666 | } |
| 667 | time.Sleep(100 * time.Millisecond) |
| 668 | glog.Infof("Waiting for checksums to match. Expected: %d. Current: %d\n", |
| 669 | proposal.ExpectedChecksum, cur) |
| 670 | } |
| 671 | if time.Now().After(end) { |
| 672 | glog.Warningf( |
| 673 | "Giving up on predicate deletion: %q due to timeout. Wanted checksum: %d.", |
| 674 | proposal.CleanPredicate, proposal.ExpectedChecksum) |
no test coverage detected