| 483 | } |
| 484 | |
| 485 | func (r *reducer) writeSplitLists(db, tmpDb *badger.DB, writer *badger.StreamWriter) { |
| 486 | // baseStreamId is the max ID seen while writing non-split lists. |
| 487 | baseStreamId := atomic.AddUint32(&r.streamId, 1) |
| 488 | stream := tmpDb.NewStreamAt(math.MaxUint64) |
| 489 | stream.LogPrefix = "copying split keys to main DB" |
| 490 | stream.Send = func(buf *z.Buffer) error { |
| 491 | kvs, err := badger.BufferToKVList(buf) |
| 492 | x.Check(err) |
| 493 | |
| 494 | buf.Reset() |
| 495 | for _, kv := range kvs.Kv { |
| 496 | kv.StreamId += baseStreamId |
| 497 | badger.KVToBuffer(kv, buf) |
| 498 | } |
| 499 | x.Check(writer.Write(buf)) |
| 500 | return nil |
| 501 | } |
| 502 | x.Check(stream.Orchestrate(context.Background())) |
| 503 | } |
| 504 | |
| 505 | // copyVectorDataToShards copies vector data from the shared vectorTmpDb to the correct output DBs. |
| 506 | // It uses the predToOutputShard map to determine which predicate goes to which shard. |