we create MAP files each of a limited size and write sorted data into it. We may end up creating many such files. Then we take all of these MAP files and read part of the data from each file, sort all of this data and then use streamwriter to write the sorted data into pstore badger. We store some s
(req *pb.RestoreRequest, mapDir string)
| 736 | // keys, meaning from one partition key to the next partition key. I am not sure if there is a |
| 737 | // value in having these partition keys. Maybe, we can live without them. |
| 738 | func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) { |
| 739 | uri, err := url.Parse(req.Location) |
| 740 | if err != nil { |
| 741 | return nil, err |
| 742 | } |
| 743 | if req.RestoreTs == 0 { |
| 744 | return nil, errors.New("RestoreRequest must have a valid restoreTs") |
| 745 | } |
| 746 | |
| 747 | creds := getCredentialsFromRestoreRequest(req) |
| 748 | h, err := NewUriHandler(uri, creds) |
| 749 | if err != nil { |
| 750 | return nil, err |
| 751 | } |
| 752 | |
| 753 | manifests, err := getManifestsToRestore(h, uri, req) |
| 754 | if err != nil { |
| 755 | return nil, errors.Wrapf(err, "cannot retrieve manifests") |
| 756 | } |
| 757 | glog.Infof("Got %d backups to restore ", len(manifests)) |
| 758 | |
| 759 | cfg, err := getEncConfig(req) |
| 760 | if err != nil { |
| 761 | return nil, errors.Wrapf(err, "unable to get encryption config") |
| 762 | } |
| 763 | keys, err := x.GetEncAclKeys(cfg) |
| 764 | if err != nil { |
| 765 | return nil, errors.Wrapf(err, "unable to get encryption keys") |
| 766 | } |
| 767 | |
| 768 | mapper := &mapper{ |
| 769 | buf: z.NewBuffer(mapFileSz, "Restore.Buffer"), |
| 770 | thr: y.NewThrottle(3), |
| 771 | bufLock: &sync.Mutex{}, |
| 772 | closer: z.NewCloser(1), |
| 773 | reqCh: make(chan listReq, 3), |
| 774 | restoreTs: req.RestoreTs, |
| 775 | mapDir: mapDir, |
| 776 | szHist: z.NewHistogramData(z.HistogramBounds(10, 32)), |
| 777 | } |
| 778 | |
| 779 | numGo := 8 |
| 780 | g, ctx := errgroup.WithContext(mapper.closer.Ctx()) |
| 781 | for range numGo { |
| 782 | g.Go(func() error { |
| 783 | return mapper.processReqCh(ctx) |
| 784 | }) |
| 785 | } |
| 786 | go mapper.Progress() |
| 787 | defer func() { |
| 788 | if err := mapper.Flush(); err != nil { |
| 789 | glog.Warningf("error calling flush during map: %v", err) |
| 790 | } |
| 791 | mapper.closer.SignalAndWait() |
| 792 | }() |
| 793 | |
| 794 | dropAll := false |
| 795 | dropAttr := make(map[string]struct{}) |
no test coverage detected