MCPcopy
hub / github.com/dgraph-io/dgraph / startWriting

Method startWriting

dgraph/cmd/bulk/reduce.go:363–483  ·  view source on GitHub ↗
(ci *countIndexer, vi *vectorIndexer, writerCh chan *encodeRequest, closer *z.Closer)

Source from the content-addressed store, hash-verified

361}
362
363func (r *reducer) startWriting(ci *countIndexer, vi *vectorIndexer, writerCh chan *encodeRequest, closer *z.Closer) {
364 defer closer.Done()
365
366 // Concurrently write split lists to a temporary badger.
367 tmpWg := new(sync.WaitGroup)
368 tmpWg.Add(1)
369 go r.writeTmpSplits(ci, tmpWg)
370
371 count := func(req *encodeRequest) {
372 defer func() {
373 if err := req.countBuf.Release(); err != nil {
374 glog.Warningf("error in releasing buffer: %v", err)
375 }
376 }()
377 if req.countBuf.IsEmpty() {
378 return
379 }
380
381 // req.countBuf is already sorted.
382 sz := req.countBuf.LenNoPadding()
383 ci.countBuf.Grow(sz)
384
385 if err := req.countBuf.SliceIterate(func(slice []byte) error {
386 ce := countEntry(slice)
387 ci.addCountEntry(ce)
388 return nil
389 }); err != nil {
390 glog.Errorf("error while iterating over buf: %v", err)
391 x.Check(err)
392 }
393 }
394
395 // Process vector entries and insert into HNSW index
396 vector := func(req *encodeRequest) {
397 if req.vectorBuf == nil {
398 return
399 }
400 defer func() {
401 if err := req.vectorBuf.Release(); err != nil {
402 glog.Warningf("error releasing vector buffer: %v", err)
403 }
404 }()
405 if req.vectorBuf.IsEmpty() {
406 return
407 }
408
409 // Iterate through vector entries and insert into HNSW
410 if err := req.vectorBuf.SliceIterate(func(slice []byte) error {
411 ve := unmarshalVectorEntry(slice)
412 if ve == nil {
413 vi.handleUnmarshalError()
414 return nil
415 }
416 // Insert vector into HNSW and generate entries
417 vi.addVectorEntry(ve)
418 return nil
419 }); err != nil {
420 glog.Errorf("error processing vectors: %v", err)

Callers 1

reduceMethod · 0.95

Calls 15

writeTmpSplitsMethod · 0.95
CheckFunction · 0.92
countEntryTypeAlias · 0.85
unmarshalVectorEntryFunction · 0.85
writeFunction · 0.85
ReleaseMethod · 0.80
WarningfMethod · 0.80
addCountEntryMethod · 0.80
handleUnmarshalErrorMethod · 0.80
addVectorEntryMethod · 0.80
WaitMethod · 0.80
flushWriteBatchMethod · 0.80

Tested by

no test coverage detected