(ctx context.Context)
| 289 | } |
| 290 | |
| 291 | func (m *mapper) processReqCh(ctx context.Context) error { |
| 292 | buf := z.NewBuffer(20<<20, "processKVList") |
| 293 | defer func() { |
| 294 | if err := buf.Release(); err != nil { |
| 295 | glog.Warningf("error in releasing buffer: %v", err) |
| 296 | } |
| 297 | }() |
| 298 | |
| 299 | maxNs := uint64(0) |
| 300 | maxUid := uint64(0) |
| 301 | |
| 302 | toBuffer := func(kv *bpb.KV, version uint64) error { |
| 303 | key := y.KeyWithTs(kv.Key, version) |
| 304 | sz := proto.Size(kv) |
| 305 | b := buf.SliceAllocate(2 + len(key) + sz) |
| 306 | |
| 307 | binary.BigEndian.PutUint16(b[0:2], uint16(len(key))) |
| 308 | x.AssertTrue(copy(b[2:], key) == len(key)) |
| 309 | _, err := x.MarshalToSizedBuffer(b[2+len(key):], kv) |
| 310 | return err |
| 311 | } |
| 312 | |
| 313 | processKV := func(in *loadBackupInput, kv *bpb.KV) error { |
| 314 | if len(kv.GetUserMeta()) != 1 { |
| 315 | return errors.Errorf( |
| 316 | "Unexpected meta: %v for key: %s", kv.UserMeta, hex.Dump(kv.Key)) |
| 317 | } |
| 318 | |
| 319 | restoreKey, ns, err := fromBackupKey(kv.Key) |
| 320 | if err != nil { |
| 321 | return errors.Wrap(err, "fromBackupKey") |
| 322 | } |
| 323 | |
| 324 | // we will not process further if restore is namespace aware restore and namespace |
| 325 | // of current key is other than fromNamespace. |
| 326 | if in.isNamespaceAwareRestore && ns != in.fromNamespace { |
| 327 | return nil |
| 328 | } |
| 329 | |
| 330 | // Filter keys using the preds set. Do not do this filtering for type keys |
| 331 | // as they are meant to be in every group and their Attr value does not |
| 332 | // match a predicate name. |
| 333 | parsedKey, err := x.Parse(restoreKey) |
| 334 | if err != nil { |
| 335 | return errors.Wrapf(err, "could not parse key %s", hex.Dump(restoreKey)) |
| 336 | } |
| 337 | |
| 338 | // Update the local max uid and max namespace values. |
| 339 | maxUid = x.Max(maxUid, parsedKey.Uid) |
| 340 | if in.isNamespaceAwareRestore { |
| 341 | maxNs = x.Max(maxNs, 0) |
| 342 | } else { |
| 343 | maxNs = x.Max(maxNs, ns) |
| 344 | } |
| 345 | |
| 346 | if !in.keepSchema && (parsedKey.IsSchema() || parsedKey.IsType()) { |
| 347 | return nil |
| 348 | } |
no test coverage detected