MCPcopy
hub / github.com/dgraph-io/dgraph / streamSnapshot

Function streamSnapshot

dgraph/cmd/dgraphimport/import_client.go:112–162  ·  view source on GitHub ↗

streamSnapshot takes a p directory and a set of group IDs and streams the data from the p directory to the corresponding group IDs. It first scans the provided directory for subdirectories named with numeric group IDs.

(ctx context.Context, dc api.DgraphClient, baseDir string, groups []uint32)

Source from the content-addressed store, hash-verified

110// p directory to the corresponding group IDs. It first scans the provided directory for
111// subdirectories named with numeric group IDs.
112func streamSnapshot(ctx context.Context, dc api.DgraphClient, baseDir string, groups []uint32) error {
113 glog.Infof("[import] Starting to stream snapshot from directory: %s", baseDir)
114
115 errG, errGrpCtx := errgroup.WithContext(ctx)
116 for _, group := range groups {
117 errG.Go(func() error {
118 pDir := filepath.Join(baseDir, fmt.Sprintf("%d", group-1), "p")
119 if _, err := os.Stat(pDir); err != nil {
120 return fmt.Errorf("p directory does not exist for group [%d]: [%s]", group, pDir)
121 }
122 glog.Infof("[import] Streaming data for group [%d] from directory: [%s]", group, pDir)
123 if err := streamSnapshotForGroup(errGrpCtx, dc, pDir, group); err != nil {
124 glog.Errorf("[import] Failed to stream data for group [%v] from directory: [%s]: %v", group, pDir, err)
125 return err
126 }
127
128 return nil
129 })
130 }
131
132 if err := errG.Wait(); err != nil {
133 glog.Errorf("[import] failed to stream external snapshot: %v", err)
134 // If errors occurs during streaming of the external snapshot, we drop all the data and
135 // go back to ensure a clean slate and the cluster remains in working state.
136 glog.Info("[import] dropping all the data and going back to clean slate")
137 req := &api.UpdateExtSnapshotStreamingStateRequest{
138 Start: false,
139 Finish: true,
140 DropData: true,
141 }
142 if _, err := dc.UpdateExtSnapshotStreamingState(ctx, req); err != nil {
143 return fmt.Errorf("failed to turn off drain mode: %v", err)
144 }
145
146 glog.Info("[import] successfully disabled drain mode")
147 return err
148 }
149
150 glog.Info("[import] Completed streaming external snapshot")
151 req := &api.UpdateExtSnapshotStreamingStateRequest{
152 Start: false,
153 Finish: true,
154 DropData: false,
155 }
156 if _, err := dc.UpdateExtSnapshotStreamingState(ctx, req); err != nil {
157 glog.Errorf("[import] failed to disable drain mode: %v", err)
158 return fmt.Errorf("failed to disable drain mode: %v", err)
159 }
160 glog.Info("[import] successfully disable drain mode")
161 return nil
162}
163
164// streamSnapshotForGroup handles the actual data streaming process for a single group.
165// It opens the BadgerDB at the specified directory and streams all data to the server.

Callers 1

ImportFunction · 0.85

Calls 6

streamSnapshotForGroupFunction · 0.85
InfofMethod · 0.80
WaitMethod · 0.80
InfoMethod · 0.80
ErrorfMethod · 0.45

Tested by

no test coverage detected