(snap *pb.Snapshot)
| 1054 | } |
| 1055 | |
| 1056 | func (n *node) retrieveSnapshot(snap *pb.Snapshot) error { |
| 1057 | closer, err := n.startTask(opSnapshot) |
| 1058 | if err != nil { |
| 1059 | return err |
| 1060 | } |
| 1061 | defer closer.Done() |
| 1062 | |
| 1063 | // In some edge cases, the Zero leader might not have been able to update |
| 1064 | // the status of Alpha leader. So, instead of blocking forever on waiting |
| 1065 | // for Zero to send us the updates info about the leader, we can just use |
| 1066 | // the Snapshot RaftContext, which contains the address of the leader. |
| 1067 | var pool *conn.Pool |
| 1068 | addr := snap.Context.GetAddr() |
| 1069 | glog.V(2).Infof("Snapshot.RaftContext.Addr: %q", addr) |
| 1070 | if len(addr) > 0 { |
| 1071 | p, err := conn.GetPools().Get(addr) |
| 1072 | if err != nil { |
| 1073 | glog.V(2).Infof("conn.Get(%q) Error: %v", addr, err) |
| 1074 | } else { |
| 1075 | pool = p |
| 1076 | glog.V(2).Infof("Leader connection picked from RaftContext") |
| 1077 | } |
| 1078 | } |
| 1079 | if pool == nil { |
| 1080 | glog.V(2).Infof("No leader conn from RaftContext. Using membership state.") |
| 1081 | p, err := n.leaderBlocking() |
| 1082 | if err != nil { |
| 1083 | return err |
| 1084 | } |
| 1085 | pool = p |
| 1086 | } |
| 1087 | |
| 1088 | // Need to clear pl's stored in memory for the case when retrieving snapshot with |
| 1089 | // index greater than this node's last index |
| 1090 | // Should invalidate/remove pl's to this group only ideally |
| 1091 | // |
| 1092 | // We can safely evict posting lists from memory. Because, all the updates corresponding to txn |
| 1093 | // commits up until then have already been written to pstore. And the way we take snapshots, we |
| 1094 | // keep all the pre-writes for a pending transaction, so they will come back to memory, as Raft |
| 1095 | // logs are replayed. |
| 1096 | if err := n.populateSnapshot(snap, pool); err != nil { |
| 1097 | return errors.Wrapf(err, "cannot retrieve snapshot from peer") |
| 1098 | } |
| 1099 | // Populate shard stores the streamed data directly into db, so we need to refresh |
| 1100 | // schema for current group id |
| 1101 | if err := schema.LoadFromDb(closer.Ctx()); err != nil { |
| 1102 | return errors.Wrapf(err, "while initializing schema") |
| 1103 | } |
| 1104 | groups().triggerMembershipSync() |
| 1105 | return nil |
| 1106 | } |
| 1107 | |
| 1108 | func (n *node) proposeCDCState(ts uint64) error { |
| 1109 | proposal := &pb.Proposal{ |
no test coverage detected