(snap *pb.Snapshot, out pb.Worker_StreamSnapshotServer)
| 175 | } |
| 176 | |
| 177 | func doStreamSnapshot(snap *pb.Snapshot, out pb.Worker_StreamSnapshotServer) error { |
| 178 | // We choose not to try and match the requested snapshot from the latest snapshot at the leader. |
| 179 | // This is the job of the Raft library. At the leader end, we service whatever is asked of us. |
| 180 | // If this snapshot is old, Raft should cause the follower to request another one, to overwrite |
| 181 | // the data from this one. |
| 182 | // |
| 183 | // Snapshot request contains the txn read timestamp to be used to get a consistent snapshot of |
| 184 | // the data. This is what we use in orchestrate. |
| 185 | // |
| 186 | // Note: This would also pick up schema updates done "after" the snapshot index. Guess that |
| 187 | // might be OK. Otherwise, we'd want to version the schemas as well. Currently, they're stored |
| 188 | // at timestamp=1. |
| 189 | |
| 190 | // We no longer check if this node is the leader, because the leader can switch between snapshot |
| 191 | // requests. Therefore, we wait until this node has reached snap.ReadTs, before servicing the |
| 192 | // request. Any other node in the group should have the same data as the leader, once it is past |
| 193 | // the read timestamp. |
| 194 | glog.Infof("Waiting to reach timestamp: %d", snap.ReadTs) |
| 195 | if err := posting.Oracle().WaitForTs(out.Context(), snap.ReadTs); err != nil { |
| 196 | return err |
| 197 | } |
| 198 | |
| 199 | stream := pstore.NewStreamAt(snap.ReadTs) |
| 200 | stream.LogPrefix = "Sending Snapshot" |
| 201 | // Use the default implementation. We no longer try to generate a rolled up posting list here. |
| 202 | // Instead, we just stream out all the versions as they are. |
| 203 | stream.KeyToList = nil |
| 204 | stream.Send = func(buf *z.Buffer) error { |
| 205 | kvs := &pb.KVS{Data: buf.Bytes()} |
| 206 | return out.Send(kvs) |
| 207 | } |
| 208 | stream.ChooseKey = func(item *badger.Item) bool { |
| 209 | if item.Version() >= snap.SinceTs { |
| 210 | return true |
| 211 | } |
| 212 | |
| 213 | if item.Version() != 1 { |
| 214 | return false |
| 215 | } |
| 216 | |
| 217 | // Type and Schema keys always have a timestamp of 1. They all need to be sent |
| 218 | // with the snapshot. |
| 219 | pk, err := x.Parse(item.Key()) |
| 220 | if err != nil { |
| 221 | return false |
| 222 | } |
| 223 | return pk.IsSchema() || pk.IsType() |
| 224 | } |
| 225 | |
| 226 | // Get the list of all the predicate and types at the time of the snapshot so that the receiver |
| 227 | // can delete predicates |
| 228 | predicates := schema.State().Predicates() |
| 229 | types := schema.State().Types() |
| 230 | |
| 231 | if err := stream.Orchestrate(out.Context()); err != nil { |
| 232 | return err |
| 233 | } |
| 234 |
no test coverage detected