MCPcopy Index your code
hub / github.com/dgraph-io/dgraph / addVectorEntry

Method addVectorEntry

dgraph/cmd/bulk/vector_indexer.go:366–440  ·  view source on GitHub ↗

addVectorEntry processes a single vector entry by inserting it into the HNSW index. This is the entry point called from reduce phase. It validates the vector dimension, creates indexer lazily, and handles errors gracefully.

(ve *vectorEntry)

Source from the content-addressed store, hash-verified

364// This is the entry point called from reduce phase.
365// It validates the vector dimension, creates indexer lazily, and handles errors gracefully.
366func (vi *vectorIndexer) addVectorEntry(ve *vectorEntry) {
367 if ve == nil {
368 vi.handleError(
369 fmt.Errorf("received nil vectorEntry"),
370 "nil_entry",
371 "<vector_indexer>",
372 )
373 return
374 }
375
376 if ve.uid == 0 {
377 vi.handleError(
378 fmt.Errorf("invalid UID=0 for predicate %s", ve.pred),
379 fmt.Sprintf("predicate=%s uid=0", ve.pred),
380 "<vector_validation>",
381 )
382 return
383 }
384 if ve.uid == math.MaxUint64 {
385 vi.handleError(
386 fmt.Errorf("invalid UID=MaxUint64 for predicate %s", ve.pred),
387 fmt.Sprintf("predicate=%s uid=MaxUint64", ve.pred),
388 "<vector_validation>",
389 )
390 return
391 }
392
393 // Validate vector dimension
394 if !vi.validateVectorDimension(ve.pred, ve.vector, ve.uid) {
395 return
396 }
397
398 indexer, tc, err := vi.getOrCreateIndexer(ve.pred)
399 if err != nil {
400 vi.handleError(err,
401 fmt.Sprintf("predicate=%s uid=%d", ve.pred, ve.uid),
402 "<vector_indexer>",
403 )
404 return
405 }
406
407 // Track predicate → shard mapping (for copy phase)
408 // INVARIANT: Each predicate should only appear in ONE shard
409 if vi.predToShardMu != nil && vi.predToOutputShard != nil {
410 vi.predToShardMu.Lock()
411 if existingShard, exists := vi.predToOutputShard[ve.pred]; exists {
412 if existingShard != vi.shardId {
413 // This is a serious invariant violation - same predicate in multiple shards
414 // This could lead to data corruption as HNSW graph would be split
415 vi.predToShardMu.Unlock()
416 vi.handleError(
417 fmt.Errorf("predicate %s already assigned to shard %d, but shard %d also received vectors",
418 ve.pred, existingShard, vi.shardId),
419 fmt.Sprintf("predicate=%s existing_shard=%d current_shard=%d", ve.pred, existingShard, vi.shardId),
420 "<vector_invariant>",
421 )
422 return // Skip this vector to prevent further corruption
423 }

Callers 1

startWritingMethod · 0.80

Calls 8

handleErrorMethod · 0.95
getOrCreateIndexerMethod · 0.95
InfofMethod · 0.80
InsertMethod · 0.65
ErrorfMethod · 0.45
LockMethod · 0.45
UnlockMethod · 0.45

Tested by

no test coverage detected