MCPcopy
hub / github.com/dgraph-io/dgraph / processReqCh

Method processReqCh

worker/restore_map.go:291–647  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

289}
290
291func (m *mapper) processReqCh(ctx context.Context) error {
292 buf := z.NewBuffer(20<<20, "processKVList")
293 defer func() {
294 if err := buf.Release(); err != nil {
295 glog.Warningf("error in releasing buffer: %v", err)
296 }
297 }()
298
299 maxNs := uint64(0)
300 maxUid := uint64(0)
301
302 toBuffer := func(kv *bpb.KV, version uint64) error {
303 key := y.KeyWithTs(kv.Key, version)
304 sz := proto.Size(kv)
305 b := buf.SliceAllocate(2 + len(key) + sz)
306
307 binary.BigEndian.PutUint16(b[0:2], uint16(len(key)))
308 x.AssertTrue(copy(b[2:], key) == len(key))
309 _, err := x.MarshalToSizedBuffer(b[2+len(key):], kv)
310 return err
311 }
312
313 processKV := func(in *loadBackupInput, kv *bpb.KV) error {
314 if len(kv.GetUserMeta()) != 1 {
315 return errors.Errorf(
316 "Unexpected meta: %v for key: %s", kv.UserMeta, hex.Dump(kv.Key))
317 }
318
319 restoreKey, ns, err := fromBackupKey(kv.Key)
320 if err != nil {
321 return errors.Wrap(err, "fromBackupKey")
322 }
323
324 // we will not process further if restore is namespace aware restore and namespace
325 // of current key is other than fromNamespace.
326 if in.isNamespaceAwareRestore && ns != in.fromNamespace {
327 return nil
328 }
329
330 // Filter keys using the preds set. Do not do this filtering for type keys
331 // as they are meant to be in every group and their Attr value does not
332 // match a predicate name.
333 parsedKey, err := x.Parse(restoreKey)
334 if err != nil {
335 return errors.Wrapf(err, "could not parse key %s", hex.Dump(restoreKey))
336 }
337
338 // Update the local max uid and max namespace values.
339 maxUid = x.Max(maxUid, parsedKey.Uid)
340 if in.isNamespaceAwareRestore {
341 maxNs = x.Max(maxNs, 0)
342 } else {
343 maxNs = x.Max(maxNs, ns)
344 }
345
346 if !in.keepSchema && (parsedKey.IsSchema() || parsedKey.IsType()) {
347 return nil
348 }

Callers 1

RunMapperFunction · 0.95

Calls 15

RollupMethod · 0.95
sendForWritingMethod · 0.95
AssertTrueFunction · 0.92
MarshalToSizedBufferFunction · 0.92
ParseFunction · 0.92
MaxFunction · 0.92
ParseNamespaceAttrFunction · 0.92
NamespaceAttrFunction · 0.92
FromBackupKeyFunction · 0.92
FromBackupPostingListFunction · 0.92
FreePackFunction · 0.92
MarshalPostingListFunction · 0.92

Tested by

no test coverage detected