wait waits for all vector indexing operations to complete. It flushes any remaining writes and commits all HNSW mutations from LocalCache to vectorTmpDb.
()
| 442 | // wait waits for all vector indexing operations to complete. |
| 443 | // It flushes any remaining writes and commits all HNSW mutations from LocalCache to vectorTmpDb. |
| 444 | func (vi *vectorIndexer) wait() { |
| 445 | // Flush any remaining vector posting list writes |
| 446 | if err := vi.flushWriteBatch(); err != nil { |
| 447 | vi.handleError(err, fmt.Sprintf("shard=%d", vi.shardId), "<vector_flush>") |
| 448 | } |
| 449 | |
| 450 | vi.mu.Lock() |
| 451 | defer vi.mu.Unlock() |
| 452 | |
| 453 | // If no indexers were created (no vectors arrived), nothing to commit |
| 454 | if len(vi.txns) == 0 { |
| 455 | glog.Infof("Vector indexer wait completed for shard %d (no vectors processed)", vi.shardId) |
| 456 | return |
| 457 | } |
| 458 | |
| 459 | // Commit HNSW mutations from LocalCache to vectorTmpDb |
| 460 | // This is critical - without this, HNSW edge data stays in memory and never gets persisted |
| 461 | // Step 1: Call Update() to move mutations from plists to deltas |
| 462 | // Step 2: Call CommitToDisk() to write deltas to BadgerDB |
| 463 | writer := posting.NewTxnWriter(vi.tmpDb) |
| 464 | for pred, txn := range vi.txns { |
| 465 | // Update moves mutations from posting lists to delta cache |
| 466 | txn.Update() |
| 467 | if err := txn.CommitToDisk(writer, vi.state.writeTs); err != nil { |
| 468 | vi.handleError(err, |
| 469 | fmt.Sprintf("predicate=%s shard=%d", pred, vi.shardId), |
| 470 | "<vector_commit>", |
| 471 | ) |
| 472 | } else { |
| 473 | glog.Infof("Committed HNSW data for predicate %s to tmpDb (shard %d)", pred, vi.shardId) |
| 474 | } |
| 475 | } |
| 476 | if err := writer.Flush(); err != nil { |
| 477 | vi.handleError(err, fmt.Sprintf("shard=%d", vi.shardId), "<vector_commit>") |
| 478 | } |
| 479 | |
| 480 | glog.Infof("Vector indexer wait completed for shard %d with %d predicates", vi.shardId, len(vi.vectorPreds)) |
| 481 | } |
no test coverage detected