MCPcopy Index your code
hub / github.com/dgraph-io/dgraph / writeTmpSplits

Method writeTmpSplits

dgraph/cmd/bulk/reduce.go:332–361  ·  view source on GitHub ↗
(ci *countIndexer, wg *sync.WaitGroup)

Source from the content-addressed store, hash-verified

330const maxSplitBatchLen = 1000
331
332func (r *reducer) writeTmpSplits(ci *countIndexer, wg *sync.WaitGroup) {
333 defer wg.Done()
334 splitBatchLen := 0
335
336 for kvs := range ci.splitCh {
337 if kvs == nil || len(kvs.Kv) == 0 {
338 continue
339 }
340
341 for i := 0; i < len(kvs.Kv); i += maxSplitBatchLen {
342 // flush the write batch when the max batch length is reached to prevent the
343 // value log from growing over the allowed limit.
344 if splitBatchLen >= maxSplitBatchLen {
345 x.Check(ci.splitWriter.Flush())
346 ci.splitWriter = ci.tmpDb.NewManagedWriteBatch()
347 splitBatchLen = 0
348 }
349
350 batch := &bpb.KVList{}
351 if i+maxSplitBatchLen >= len(kvs.Kv) {
352 batch.Kv = kvs.Kv[i:]
353 } else {
354 batch.Kv = kvs.Kv[i : i+maxSplitBatchLen]
355 }
356 splitBatchLen += len(batch.Kv)
357 x.Check(ci.splitWriter.WriteList(batch))
358 }
359 }
360 x.Check(ci.splitWriter.Flush())
361}
362
363func (r *reducer) startWriting(ci *countIndexer, vi *vectorIndexer, writerCh chan *encodeRequest, closer *z.Closer) {
364 defer closer.Done()

Callers 1

startWritingMethod · 0.95

Calls 3

CheckFunction · 0.92
FlushMethod · 0.65
DoneMethod · 0.45

Tested by

no test coverage detected