(ci *countIndexer, vi *vectorIndexer, writerCh chan *encodeRequest, closer *z.Closer)
| 361 | } |
| 362 | |
| 363 | func (r *reducer) startWriting(ci *countIndexer, vi *vectorIndexer, writerCh chan *encodeRequest, closer *z.Closer) { |
| 364 | defer closer.Done() |
| 365 | |
| 366 | // Concurrently write split lists to a temporary badger. |
| 367 | tmpWg := new(sync.WaitGroup) |
| 368 | tmpWg.Add(1) |
| 369 | go r.writeTmpSplits(ci, tmpWg) |
| 370 | |
| 371 | count := func(req *encodeRequest) { |
| 372 | defer func() { |
| 373 | if err := req.countBuf.Release(); err != nil { |
| 374 | glog.Warningf("error in releasing buffer: %v", err) |
| 375 | } |
| 376 | }() |
| 377 | if req.countBuf.IsEmpty() { |
| 378 | return |
| 379 | } |
| 380 | |
| 381 | // req.countBuf is already sorted. |
| 382 | sz := req.countBuf.LenNoPadding() |
| 383 | ci.countBuf.Grow(sz) |
| 384 | |
| 385 | if err := req.countBuf.SliceIterate(func(slice []byte) error { |
| 386 | ce := countEntry(slice) |
| 387 | ci.addCountEntry(ce) |
| 388 | return nil |
| 389 | }); err != nil { |
| 390 | glog.Errorf("error while iterating over buf: %v", err) |
| 391 | x.Check(err) |
| 392 | } |
| 393 | } |
| 394 | |
| 395 | // Process vector entries and insert into HNSW index |
| 396 | vector := func(req *encodeRequest) { |
| 397 | if req.vectorBuf == nil { |
| 398 | return |
| 399 | } |
| 400 | defer func() { |
| 401 | if err := req.vectorBuf.Release(); err != nil { |
| 402 | glog.Warningf("error releasing vector buffer: %v", err) |
| 403 | } |
| 404 | }() |
| 405 | if req.vectorBuf.IsEmpty() { |
| 406 | return |
| 407 | } |
| 408 | |
| 409 | // Iterate through vector entries and insert into HNSW |
| 410 | if err := req.vectorBuf.SliceIterate(func(slice []byte) error { |
| 411 | ve := unmarshalVectorEntry(slice) |
| 412 | if ve == nil { |
| 413 | vi.handleUnmarshalError() |
| 414 | return nil |
| 415 | } |
| 416 | // Insert vector into HNSW and generate entries |
| 417 | vi.addVectorEntry(ve) |
| 418 | return nil |
| 419 | }); err != nil { |
| 420 | glog.Errorf("error processing vectors: %v", err) |
no test coverage detected