MCPcopy
hub / github.com/dgraph-io/dgraph / RunMapper

Function RunMapper

worker/restore_map.go:738–915  ·  view source on GitHub ↗

we create MAP files each of a limited size and write sorted data into it. We may end up creating many such files. Then we take all of these MAP files and read part of the data from each file, sort all of this data and then use streamwriter to write the sorted data into pstore badger. We store some s

(req *pb.RestoreRequest, mapDir string)

Source from the content-addressed store, hash-verified

736// keys, meaning from one partition key to the next partition key. I am not sure if there is a
737// value in having these partition keys. Maybe, we can live without them.
738func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) {
739 uri, err := url.Parse(req.Location)
740 if err != nil {
741 return nil, err
742 }
743 if req.RestoreTs == 0 {
744 return nil, errors.New("RestoreRequest must have a valid restoreTs")
745 }
746
747 creds := getCredentialsFromRestoreRequest(req)
748 h, err := NewUriHandler(uri, creds)
749 if err != nil {
750 return nil, err
751 }
752
753 manifests, err := getManifestsToRestore(h, uri, req)
754 if err != nil {
755 return nil, errors.Wrapf(err, "cannot retrieve manifests")
756 }
757 glog.Infof("Got %d backups to restore ", len(manifests))
758
759 cfg, err := getEncConfig(req)
760 if err != nil {
761 return nil, errors.Wrapf(err, "unable to get encryption config")
762 }
763 keys, err := x.GetEncAclKeys(cfg)
764 if err != nil {
765 return nil, errors.Wrapf(err, "unable to get encryption keys")
766 }
767
768 mapper := &mapper{
769 buf: z.NewBuffer(mapFileSz, "Restore.Buffer"),
770 thr: y.NewThrottle(3),
771 bufLock: &sync.Mutex{},
772 closer: z.NewCloser(1),
773 reqCh: make(chan listReq, 3),
774 restoreTs: req.RestoreTs,
775 mapDir: mapDir,
776 szHist: z.NewHistogramData(z.HistogramBounds(10, 32)),
777 }
778
779 numGo := 8
780 g, ctx := errgroup.WithContext(mapper.closer.Ctx())
781 for range numGo {
782 g.Go(func() error {
783 return mapper.processReqCh(ctx)
784 })
785 }
786 go mapper.Progress()
787 defer func() {
788 if err := mapper.Flush(); err != nil {
789 glog.Warningf("error calling flush during map: %v", err)
790 }
791 mapper.closer.SignalAndWait()
792 }()
793
794 dropAll := false
795 dropAttr := make(map[string]struct{})

Callers 3

runExportBackupFunction · 0.92
handleRestoreProposalFunction · 0.85
RunOfflineRestoreFunction · 0.85

Calls 15

processReqChMethod · 0.95
ProgressMethod · 0.95
FlushMethod · 0.95
MapMethod · 0.95
GetEncAclKeysFunction · 0.92
MaxFunction · 0.92
NewUriHandlerFunction · 0.85
getManifestsToRestoreFunction · 0.85
getEncConfigFunction · 0.85
readerFromFunction · 0.85
InfofMethod · 0.80

Tested by

no test coverage detected