streamSnapshot takes a p directory and a set of group IDs and streams the data from the p directory to the corresponding group IDs. It first scans the provided directory for subdirectories named with numeric group IDs.
(ctx context.Context, dc api.DgraphClient, baseDir string, groups []uint32)
| 110 | // p directory to the corresponding group IDs. It first scans the provided directory for |
| 111 | // subdirectories named with numeric group IDs. |
| 112 | func streamSnapshot(ctx context.Context, dc api.DgraphClient, baseDir string, groups []uint32) error { |
| 113 | glog.Infof("[import] Starting to stream snapshot from directory: %s", baseDir) |
| 114 | |
| 115 | errG, errGrpCtx := errgroup.WithContext(ctx) |
| 116 | for _, group := range groups { |
| 117 | errG.Go(func() error { |
| 118 | pDir := filepath.Join(baseDir, fmt.Sprintf("%d", group-1), "p") |
| 119 | if _, err := os.Stat(pDir); err != nil { |
| 120 | return fmt.Errorf("p directory does not exist for group [%d]: [%s]", group, pDir) |
| 121 | } |
| 122 | glog.Infof("[import] Streaming data for group [%d] from directory: [%s]", group, pDir) |
| 123 | if err := streamSnapshotForGroup(errGrpCtx, dc, pDir, group); err != nil { |
| 124 | glog.Errorf("[import] Failed to stream data for group [%v] from directory: [%s]: %v", group, pDir, err) |
| 125 | return err |
| 126 | } |
| 127 | |
| 128 | return nil |
| 129 | }) |
| 130 | } |
| 131 | |
| 132 | if err := errG.Wait(); err != nil { |
| 133 | glog.Errorf("[import] failed to stream external snapshot: %v", err) |
| 134 | // If errors occurs during streaming of the external snapshot, we drop all the data and |
| 135 | // go back to ensure a clean slate and the cluster remains in working state. |
| 136 | glog.Info("[import] dropping all the data and going back to clean slate") |
| 137 | req := &api.UpdateExtSnapshotStreamingStateRequest{ |
| 138 | Start: false, |
| 139 | Finish: true, |
| 140 | DropData: true, |
| 141 | } |
| 142 | if _, err := dc.UpdateExtSnapshotStreamingState(ctx, req); err != nil { |
| 143 | return fmt.Errorf("failed to turn off drain mode: %v", err) |
| 144 | } |
| 145 | |
| 146 | glog.Info("[import] successfully disabled drain mode") |
| 147 | return err |
| 148 | } |
| 149 | |
| 150 | glog.Info("[import] Completed streaming external snapshot") |
| 151 | req := &api.UpdateExtSnapshotStreamingStateRequest{ |
| 152 | Start: false, |
| 153 | Finish: true, |
| 154 | DropData: false, |
| 155 | } |
| 156 | if _, err := dc.UpdateExtSnapshotStreamingState(ctx, req); err != nil { |
| 157 | glog.Errorf("[import] failed to disable drain mode: %v", err) |
| 158 | return fmt.Errorf("failed to disable drain mode: %v", err) |
| 159 | } |
| 160 | glog.Info("[import] successfully disable drain mode") |
| 161 | return nil |
| 162 | } |
| 163 | |
| 164 | // streamSnapshotForGroup handles the actual data streaming process for a single group. |
| 165 | // It opens the BadgerDB at the specified directory and streams all data to the server. |
no test coverage detected