TestHead_HighConcurrencyReadAndWrite generates 1000 series with a step of 15s and fills a whole block with samples, this means in total it generates 4000 chunks because with a step of 15s there are 4 chunks per block per series. While appending the samples to the head it concurrently queries them fr
(t *testing.T)
| 597 | // While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the |
| 598 | // returned results are correct. |
| 599 | func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { |
| 600 | for _, appV2 := range []bool{false, true} { |
| 601 | t.Run(fmt.Sprintf("appV2=%v", appV2), func(t *testing.T) { |
| 602 | head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) |
| 603 | |
| 604 | seriesCnt := 1000 |
| 605 | readConcurrency := 2 |
| 606 | writeConcurrency := 10 |
| 607 | startTs := uint64(DefaultBlockDuration) // Start at the second block relative to the unix epoch. |
| 608 | qryRange := uint64(5 * time.Minute.Milliseconds()) |
| 609 | step := uint64(15 * time.Second / time.Millisecond) |
| 610 | endTs := startTs + uint64(DefaultBlockDuration) |
| 611 | |
| 612 | labelSets := make([]labels.Labels, seriesCnt) |
| 613 | for i := range seriesCnt { |
| 614 | labelSets[i] = labels.FromStrings("seriesId", strconv.Itoa(i)) |
| 615 | } |
| 616 | require.NoError(t, head.Init(0)) |
| 617 | |
| 618 | g, ctx := errgroup.WithContext(t.Context()) |
| 619 | whileNotCanceled := func(f func() (bool, error)) error { |
| 620 | for ctx.Err() == nil { |
| 621 | cont, err := f() |
| 622 | if err != nil { |
| 623 | return err |
| 624 | } |
| 625 | if !cont { |
| 626 | return nil |
| 627 | } |
| 628 | } |
| 629 | return nil |
| 630 | } |
| 631 | |
| 632 | // Create one channel for each write worker, the channels will be used by the coordinator |
| 633 | // go routine to coordinate which timestamps each write worker has to write. |
| 634 | writerTsCh := make([]chan uint64, writeConcurrency) |
| 635 | for writerTsChIdx := range writerTsCh { |
| 636 | writerTsCh[writerTsChIdx] = make(chan uint64) |
| 637 | } |
| 638 | |
| 639 | // workerReadyWg is used to synchronize the start of the test, |
| 640 | // we only start the test once all workers signal that they're ready. |
| 641 | var workerReadyWg sync.WaitGroup |
| 642 | workerReadyWg.Add(writeConcurrency + readConcurrency) |
| 643 | |
| 644 | // Start the write workers. |
| 645 | for wid := range writeConcurrency { |
| 646 | // Create copy of workerID to be used by worker routine. |
| 647 | workerID := wid |
| 648 | |
| 649 | g.Go(func() error { |
| 650 | // The label sets which this worker will write. |
| 651 | workerLabelSets := labelSets[(seriesCnt/writeConcurrency)*workerID : (seriesCnt/writeConcurrency)*(workerID+1)] |
| 652 | |
| 653 | // Signal that this worker is ready. |
| 654 | workerReadyWg.Done() |
| 655 | |
| 656 | return whileNotCanceled(func() (bool, error) { |
nothing calls this directly
no test coverage detected
searching dependent graphs…