addVectorEntry processes a single vector entry by inserting it into the HNSW index. This is the entry point called from reduce phase. It validates the vector dimension, creates indexer lazily, and handles errors gracefully.
(ve *vectorEntry)
| 364 | // This is the entry point called from reduce phase. |
| 365 | // It validates the vector dimension, creates indexer lazily, and handles errors gracefully. |
| 366 | func (vi *vectorIndexer) addVectorEntry(ve *vectorEntry) { |
| 367 | if ve == nil { |
| 368 | vi.handleError( |
| 369 | fmt.Errorf("received nil vectorEntry"), |
| 370 | "nil_entry", |
| 371 | "<vector_indexer>", |
| 372 | ) |
| 373 | return |
| 374 | } |
| 375 | |
| 376 | if ve.uid == 0 { |
| 377 | vi.handleError( |
| 378 | fmt.Errorf("invalid UID=0 for predicate %s", ve.pred), |
| 379 | fmt.Sprintf("predicate=%s uid=0", ve.pred), |
| 380 | "<vector_validation>", |
| 381 | ) |
| 382 | return |
| 383 | } |
| 384 | if ve.uid == math.MaxUint64 { |
| 385 | vi.handleError( |
| 386 | fmt.Errorf("invalid UID=MaxUint64 for predicate %s", ve.pred), |
| 387 | fmt.Sprintf("predicate=%s uid=MaxUint64", ve.pred), |
| 388 | "<vector_validation>", |
| 389 | ) |
| 390 | return |
| 391 | } |
| 392 | |
| 393 | // Validate vector dimension |
| 394 | if !vi.validateVectorDimension(ve.pred, ve.vector, ve.uid) { |
| 395 | return |
| 396 | } |
| 397 | |
| 398 | indexer, tc, err := vi.getOrCreateIndexer(ve.pred) |
| 399 | if err != nil { |
| 400 | vi.handleError(err, |
| 401 | fmt.Sprintf("predicate=%s uid=%d", ve.pred, ve.uid), |
| 402 | "<vector_indexer>", |
| 403 | ) |
| 404 | return |
| 405 | } |
| 406 | |
| 407 | // Track predicate → shard mapping (for copy phase) |
| 408 | // INVARIANT: Each predicate should only appear in ONE shard |
| 409 | if vi.predToShardMu != nil && vi.predToOutputShard != nil { |
| 410 | vi.predToShardMu.Lock() |
| 411 | if existingShard, exists := vi.predToOutputShard[ve.pred]; exists { |
| 412 | if existingShard != vi.shardId { |
| 413 | // This is a serious invariant violation - same predicate in multiple shards |
| 414 | // This could lead to data corruption as HNSW graph would be split |
| 415 | vi.predToShardMu.Unlock() |
| 416 | vi.handleError( |
| 417 | fmt.Errorf("predicate %s already assigned to shard %d, but shard %d also received vectors", |
| 418 | ve.pred, existingShard, vi.shardId), |
| 419 | fmt.Sprintf("predicate=%s existing_shard=%d current_shard=%d", ve.pred, existingShard, vi.shardId), |
| 420 | "<vector_invariant>", |
| 421 | ) |
| 422 | return // Skip this vector to prevent further corruption |
| 423 | } |
no test coverage detected