| 480 | } |
| 481 | |
| 482 | func TestQueue_concurrently(t *testing.T) { |
| 483 | dir := filepath.Join(t.TempDir(), t.Name()) |
| 484 | |
| 485 | q, err := NewQueue(dir, 128*1024*1024) |
| 486 | assert.NoError(t, err) |
| 487 | |
| 488 | var ( |
| 489 | messages sync.Map |
| 490 | wait sync.WaitGroup |
| 491 | ) |
| 492 | |
| 493 | sendMessages := make([]map[string][]byte, 4) |
| 494 | |
| 495 | for i := 0; i < 4; i++ { |
| 496 | sendMessages[i] = mockMessageData(i, 100) |
| 497 | } |
| 498 | wait.Add(4) |
| 499 | |
| 500 | for i := 0; i < 4; i++ { |
| 501 | msg := sendMessages[i] |
| 502 | |
| 503 | go func() { |
| 504 | defer wait.Done() |
| 505 | |
| 506 | for k, v := range msg { |
| 507 | err := q.Put(v) |
| 508 | messages.Store(k, v) |
| 509 | |
| 510 | if err != nil { |
| 511 | panic("get err") |
| 512 | } |
| 513 | } |
| 514 | }() |
| 515 | } |
| 516 | |
| 517 | wait.Wait() |
| 518 | |
| 519 | for i := 0; i < 400; i++ { |
| 520 | data, err := q.Get(int64(i)) |
| 521 | assert.NoError(t, err) |
| 522 | messages.Delete(string(data)) |
| 523 | } |
| 524 | |
| 525 | messages.Range(func(key, value interface{}) bool { |
| 526 | panic("get data") |
| 527 | }) |
| 528 | |
| 529 | q.Close() |
| 530 | } |
| 531 | |
| 532 | func TestQueue_GC(t *testing.T) { |
| 533 | dir := filepath.Join(t.TempDir(), t.Name()) |