MCPcopy Index your code
hub / github.com/dgraph-io/dgraph / retrieveSnapshot

Method retrieveSnapshot

worker/draft.go:1056–1106  ·  view source on GitHub ↗
(snap *pb.Snapshot)

Source from the content-addressed store, hash-verified

1054}
1055
1056func (n *node) retrieveSnapshot(snap *pb.Snapshot) error {
1057 closer, err := n.startTask(opSnapshot)
1058 if err != nil {
1059 return err
1060 }
1061 defer closer.Done()
1062
1063 // In some edge cases, the Zero leader might not have been able to update
1064 // the status of Alpha leader. So, instead of blocking forever on waiting
1065 // for Zero to send us the updates info about the leader, we can just use
1066 // the Snapshot RaftContext, which contains the address of the leader.
1067 var pool *conn.Pool
1068 addr := snap.Context.GetAddr()
1069 glog.V(2).Infof("Snapshot.RaftContext.Addr: %q", addr)
1070 if len(addr) > 0 {
1071 p, err := conn.GetPools().Get(addr)
1072 if err != nil {
1073 glog.V(2).Infof("conn.Get(%q) Error: %v", addr, err)
1074 } else {
1075 pool = p
1076 glog.V(2).Infof("Leader connection picked from RaftContext")
1077 }
1078 }
1079 if pool == nil {
1080 glog.V(2).Infof("No leader conn from RaftContext. Using membership state.")
1081 p, err := n.leaderBlocking()
1082 if err != nil {
1083 return err
1084 }
1085 pool = p
1086 }
1087
1088 // Need to clear pl's stored in memory for the case when retrieving snapshot with
1089 // index greater than this node's last index
1090 // Should invalidate/remove pl's to this group only ideally
1091 //
1092 // We can safely evict posting lists from memory. Because, all the updates corresponding to txn
1093 // commits up until then have already been written to pstore. And the way we take snapshots, we
1094 // keep all the pre-writes for a pending transaction, so they will come back to memory, as Raft
1095 // logs are replayed.
1096 if err := n.populateSnapshot(snap, pool); err != nil {
1097 return errors.Wrapf(err, "cannot retrieve snapshot from peer")
1098 }
1099 // Populate shard stores the streamed data directly into db, so we need to refresh
1100 // schema for current group id
1101 if err := schema.LoadFromDb(closer.Ctx()); err != nil {
1102 return errors.Wrapf(err, "while initializing schema")
1103 }
1104 groups().triggerMembershipSync()
1105 return nil
1106}
1107
1108func (n *node) proposeCDCState(ts uint64) error {
1109 proposal := &pb.Proposal{

Callers 1

RunMethod · 0.95

Calls 12

startTaskMethod · 0.95
leaderBlockingMethod · 0.95
populateSnapshotMethod · 0.95
GetPoolsFunction · 0.92
LoadFromDbFunction · 0.92
groupsFunction · 0.85
InfofMethod · 0.80
triggerMembershipSyncMethod · 0.80
GetMethod · 0.65
DoneMethod · 0.45
GetAddrMethod · 0.45
CtxMethod · 0.45

Tested by

no test coverage detected