MCPcopy
hub / github.com/prometheus/prometheus / TestHead_HighConcurrencyReadAndWrite

Function TestHead_HighConcurrencyReadAndWrite

tsdb/head_test.go:599–797  ·  view source on GitHub ↗

TestHead_HighConcurrencyReadAndWrite generates 1000 series with a step of 15s and fills a whole block with samples, this means in total it generates 4000 chunks because with a step of 15s there are 4 chunks per block per series. While appending the samples to the head it concurrently queries them fr

(t *testing.T)

Source from the content-addressed store, hash-verified

597// While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the
598// returned results are correct.
599func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
600 for _, appV2 := range []bool{false, true} {
601 t.Run(fmt.Sprintf("appV2=%v", appV2), func(t *testing.T) {
602 head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
603
604 seriesCnt := 1000
605 readConcurrency := 2
606 writeConcurrency := 10
607 startTs := uint64(DefaultBlockDuration) // Start at the second block relative to the unix epoch.
608 qryRange := uint64(5 * time.Minute.Milliseconds())
609 step := uint64(15 * time.Second / time.Millisecond)
610 endTs := startTs + uint64(DefaultBlockDuration)
611
612 labelSets := make([]labels.Labels, seriesCnt)
613 for i := range seriesCnt {
614 labelSets[i] = labels.FromStrings("seriesId", strconv.Itoa(i))
615 }
616 require.NoError(t, head.Init(0))
617
618 g, ctx := errgroup.WithContext(t.Context())
619 whileNotCanceled := func(f func() (bool, error)) error {
620 for ctx.Err() == nil {
621 cont, err := f()
622 if err != nil {
623 return err
624 }
625 if !cont {
626 return nil
627 }
628 }
629 return nil
630 }
631
632 // Create one channel for each write worker, the channels will be used by the coordinator
633 // go routine to coordinate which timestamps each write worker has to write.
634 writerTsCh := make([]chan uint64, writeConcurrency)
635 for writerTsChIdx := range writerTsCh {
636 writerTsCh[writerTsChIdx] = make(chan uint64)
637 }
638
639 // workerReadyWg is used to synchronize the start of the test,
640 // we only start the test once all workers signal that they're ready.
641 var workerReadyWg sync.WaitGroup
642 workerReadyWg.Add(writeConcurrency + readConcurrency)
643
644 // Start the write workers.
645 for wid := range writeConcurrency {
646 // Create copy of workerID to be used by worker routine.
647 workerID := wid
648
649 g.Go(func() error {
650 // The label sets which this worker will write.
651 workerLabelSets := labelSets[(seriesCnt/writeConcurrency)*workerID : (seriesCnt/writeConcurrency)*(workerID+1)]
652
653 // Signal that this worker is ready.
654 workerReadyWg.Done()
655
656 return whileNotCanceled(func() (bool, error) {

Callers

nothing calls this directly

Calls 15

FromStringsFunction · 0.92
MustNewMatcherFunction · 0.92
newTestHeadFunction · 0.85
fFunction · 0.85
NewBlockQuerierFunction · 0.85
InitMethod · 0.80
queryFunction · 0.70
RunMethod · 0.65
ErrMethod · 0.65
AddMethod · 0.65
DoneMethod · 0.65
AppenderV2Method · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…