(ci *countIndexer, wg *sync.WaitGroup)
| 330 | const maxSplitBatchLen = 1000 |
| 331 | |
| 332 | func (r *reducer) writeTmpSplits(ci *countIndexer, wg *sync.WaitGroup) { |
| 333 | defer wg.Done() |
| 334 | splitBatchLen := 0 |
| 335 | |
| 336 | for kvs := range ci.splitCh { |
| 337 | if kvs == nil || len(kvs.Kv) == 0 { |
| 338 | continue |
| 339 | } |
| 340 | |
| 341 | for i := 0; i < len(kvs.Kv); i += maxSplitBatchLen { |
| 342 | // flush the write batch when the max batch length is reached to prevent the |
| 343 | // value log from growing over the allowed limit. |
| 344 | if splitBatchLen >= maxSplitBatchLen { |
| 345 | x.Check(ci.splitWriter.Flush()) |
| 346 | ci.splitWriter = ci.tmpDb.NewManagedWriteBatch() |
| 347 | splitBatchLen = 0 |
| 348 | } |
| 349 | |
| 350 | batch := &bpb.KVList{} |
| 351 | if i+maxSplitBatchLen >= len(kvs.Kv) { |
| 352 | batch.Kv = kvs.Kv[i:] |
| 353 | } else { |
| 354 | batch.Kv = kvs.Kv[i : i+maxSplitBatchLen] |
| 355 | } |
| 356 | splitBatchLen += len(batch.Kv) |
| 357 | x.Check(ci.splitWriter.WriteList(batch)) |
| 358 | } |
| 359 | } |
| 360 | x.Check(ci.splitWriter.Flush()) |
| 361 | } |
| 362 | |
| 363 | func (r *reducer) startWriting(ci *countIndexer, vi *vectorIndexer, writerCh chan *encodeRequest, closer *z.Closer) { |
| 364 | defer closer.Done() |
no test coverage detected