MCPcopy
hub / github.com/dgraph-io/dgraph / doStreamSnapshot

Function doStreamSnapshot

worker/snapshot.go:177–252  ·  view source on GitHub ↗
(snap *pb.Snapshot, out pb.Worker_StreamSnapshotServer)

Source from the content-addressed store, hash-verified

175}
176
177func doStreamSnapshot(snap *pb.Snapshot, out pb.Worker_StreamSnapshotServer) error {
178 // We choose not to try and match the requested snapshot from the latest snapshot at the leader.
179 // This is the job of the Raft library. At the leader end, we service whatever is asked of us.
180 // If this snapshot is old, Raft should cause the follower to request another one, to overwrite
181 // the data from this one.
182 //
183 // Snapshot request contains the txn read timestamp to be used to get a consistent snapshot of
184 // the data. This is what we use in orchestrate.
185 //
186 // Note: This would also pick up schema updates done "after" the snapshot index. Guess that
187 // might be OK. Otherwise, we'd want to version the schemas as well. Currently, they're stored
188 // at timestamp=1.
189
190 // We no longer check if this node is the leader, because the leader can switch between snapshot
191 // requests. Therefore, we wait until this node has reached snap.ReadTs, before servicing the
192 // request. Any other node in the group should have the same data as the leader, once it is past
193 // the read timestamp.
194 glog.Infof("Waiting to reach timestamp: %d", snap.ReadTs)
195 if err := posting.Oracle().WaitForTs(out.Context(), snap.ReadTs); err != nil {
196 return err
197 }
198
199 stream := pstore.NewStreamAt(snap.ReadTs)
200 stream.LogPrefix = "Sending Snapshot"
201 // Use the default implementation. We no longer try to generate a rolled up posting list here.
202 // Instead, we just stream out all the versions as they are.
203 stream.KeyToList = nil
204 stream.Send = func(buf *z.Buffer) error {
205 kvs := &pb.KVS{Data: buf.Bytes()}
206 return out.Send(kvs)
207 }
208 stream.ChooseKey = func(item *badger.Item) bool {
209 if item.Version() >= snap.SinceTs {
210 return true
211 }
212
213 if item.Version() != 1 {
214 return false
215 }
216
217 // Type and Schema keys always have a timestamp of 1. They all need to be sent
218 // with the snapshot.
219 pk, err := x.Parse(item.Key())
220 if err != nil {
221 return false
222 }
223 return pk.IsSchema() || pk.IsType()
224 }
225
226 // Get the list of all the predicate and types at the time of the snapshot so that the receiver
227 // can delete predicates
228 predicates := schema.State().Predicates()
229 types := schema.State().Types()
230
231 if err := stream.Orchestrate(out.Context()); err != nil {
232 return err
233 }
234

Callers 1

StreamSnapshotMethod · 0.85

Calls 12

OracleFunction · 0.92
ParseFunction · 0.92
StateFunction · 0.92
InfofMethod · 0.80
WaitForTsMethod · 0.80
IsSchemaMethod · 0.80
IsTypeMethod · 0.80
PredicatesMethod · 0.80
TypesMethod · 0.80
SendMethod · 0.65
RecvMethod · 0.65
KeyMethod · 0.45

Tested by

no test coverage detected