MCPcopy
hub / github.com/lindb/lindb / TestQueue_concurrently

Function TestQueue_concurrently

pkg/queue/queue_test.go:482–530  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

480}
481
482func TestQueue_concurrently(t *testing.T) {
483 dir := filepath.Join(t.TempDir(), t.Name())
484
485 q, err := NewQueue(dir, 128*1024*1024)
486 assert.NoError(t, err)
487
488 var (
489 messages sync.Map
490 wait sync.WaitGroup
491 )
492
493 sendMessages := make([]map[string][]byte, 4)
494
495 for i := 0; i < 4; i++ {
496 sendMessages[i] = mockMessageData(i, 100)
497 }
498 wait.Add(4)
499
500 for i := 0; i < 4; i++ {
501 msg := sendMessages[i]
502
503 go func() {
504 defer wait.Done()
505
506 for k, v := range msg {
507 err := q.Put(v)
508 messages.Store(k, v)
509
510 if err != nil {
511 panic("get err")
512 }
513 }
514 }()
515 }
516
517 wait.Wait()
518
519 for i := 0; i < 400; i++ {
520 data, err := q.Get(int64(i))
521 assert.NoError(t, err)
522 messages.Delete(string(data))
523 }
524
525 messages.Range(func(key, value interface{}) bool {
526 panic("get data")
527 })
528
529 q.Close()
530}
531
532func TestQueue_GC(t *testing.T) {
533 dir := filepath.Join(t.TempDir(), t.Name())

Callers

nothing calls this directly

Calls 10

PutMethod · 0.95
GetMethod · 0.95
CloseMethod · 0.95
NewQueueFunction · 0.85
mockMessageDataFunction · 0.85
DoneMethod · 0.80
WaitMethod · 0.80
NameMethod · 0.65
AddMethod · 0.65
DeleteMethod · 0.65

Tested by

no test coverage detected