MCPcopy
hub / github.com/dgraph-io/dgraph / process

Method process

worker/restore_reduce.go:276–318  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

274}
275
276func (r *reducer) process() error {
277 if r.w == nil {
278 return nil
279 }
280 writer := r.w
281
282 kvBuf := z.NewBuffer(64<<20, "Restore.GetBuf")
283 defer func() {
284 if ierr := kvBuf.Release(); ierr != nil {
285 glog.Warningf("error in releasing buffer: %v", ierr)
286 }
287 }()
288
289 var lastKey []byte
290 for cbuf := range r.bufferCh {
291 err := cbuf.SliceIterate(func(s []byte) error {
292 me := mapEntry(s)
293 key := me.Key()
294
295 // Don't need to pick multiple versions of the same key.
296 if y.SameKey(key, lastKey) {
297 return nil
298 }
299 lastKey = append(lastKey[:0], key...)
300
301 kvBuf.WriteSlice(me.Data())
302 return nil
303 })
304 if err != nil {
305 return err
306 }
307
308 atomic.AddUint64(&r.bytesProcessed, uint64(cbuf.LenNoPadding()))
309 if err := writer.Write(kvBuf); err != nil {
310 return err
311 }
312 kvBuf.Reset()
313 if err := cbuf.Release(); err != nil {
314 glog.Warningf("error in releasing buffer: %v", err)
315 }
316 } // end loop for bufferCh
317 return nil
318}

Callers 1

ReduceMethod · 0.95

Calls 7

mapEntryTypeAlias · 0.85
ReleaseMethod · 0.80
WarningfMethod · 0.80
DataMethod · 0.80
WriteMethod · 0.65
KeyMethod · 0.45
ResetMethod · 0.45

Tested by

no test coverage detected