(t *testing.T)
| 526 | } |
| 527 | |
| 528 | func TestWatchRemoteKVStore(t *testing.T) { |
| 529 | var wg sync.WaitGroup |
| 530 | var synced atomic.Bool |
| 531 | |
| 532 | run := func(ctx context.Context, rc RemoteIDCache) context.CancelFunc { |
| 533 | ctx, cancel := context.WithCancel(ctx) |
| 534 | wg.Go(func() { |
| 535 | rc.Watch(ctx, func(context.Context) { synced.Store(true) }) |
| 536 | }) |
| 537 | return cancel |
| 538 | } |
| 539 | |
| 540 | stop := func(cancel context.CancelFunc) { |
| 541 | cancel() |
| 542 | wg.Wait() |
| 543 | synced.Store(false) |
| 544 | } |
| 545 | |
| 546 | global := Allocator{logger: hivetest.Logger(t), remoteCaches: make(map[string]*remoteCache)} |
| 547 | events := make(AllocatorEventChan, 10) |
| 548 | |
| 549 | ctx, cancel := context.WithCancel(context.Background()) |
| 550 | |
| 551 | // Ensure that the goroutines are properly collected also in case the test fails. |
| 552 | defer stop(cancel) |
| 553 | |
| 554 | newRemoteAllocator := func(backend Backend) *Allocator { |
| 555 | remote, err := NewAllocator(hivetest.Logger(t), TestAllocatorKey(""), backend, WithEvents(events), WithoutAutostart(), WithoutGC()) |
| 556 | require.NoError(t, err) |
| 557 | |
| 558 | return remote |
| 559 | } |
| 560 | |
| 561 | // Add a new remote cache, and assert that it is registered correctly |
| 562 | // and the proper events are emitted |
| 563 | backend := newDummyBackend() |
| 564 | remote := newRemoteAllocator(backend) |
| 565 | |
| 566 | backend.AllocateID(ctx, idpool.ID(1), TestAllocatorKey("foo")) |
| 567 | backend.AllocateID(ctx, idpool.ID(2), TestAllocatorKey("baz")) |
| 568 | |
| 569 | rc := global.NewRemoteCache("remote", remote) |
| 570 | require.False(t, rc.Synced(), "The cache should not be synchronized") |
| 571 | cancel = run(ctx, rc) |
| 572 | |
| 573 | require.Equal(t, AllocatorEvent{ID: idpool.ID(1), Key: TestAllocatorKey("foo"), Typ: AllocatorChangeUpsert}, <-events) |
| 574 | require.Equal(t, AllocatorEvent{ID: idpool.ID(2), Key: TestAllocatorKey("baz"), Typ: AllocatorChangeUpsert}, <-events) |
| 575 | |
| 576 | ev := <-events |
| 577 | require.Equal(t, AllocatorChangeSync, ev.Typ) |
| 578 | require.False(t, rc.Synced(), "The cache should not be synchronized") |
| 579 | close(ev.Done) |
| 580 | |
| 581 | require.Eventually(t, func() bool { |
| 582 | global.remoteCachesMutex.RLock() |
| 583 | defer global.remoteCachesMutex.RUnlock() |
| 584 | return global.remoteCaches["remote"] == rc |
| 585 | }, 1*time.Second, 10*time.Millisecond) |
nothing calls this directly
no test coverage detected
searching dependent graphs…