TestStoreWatchSlowConsumer ensures that slow consumers are handled properly. Since Watcher.EventChan() has a buffer of size 100 we can only queue 100 event per watcher. If the consumer cannot consume the event on time and another event arrives, the channel is closed and event is discarded. This tes
(t *testing.T)
| 822 | // This test ensures that after closing the channel, the store can continue |
| 823 | // to operate correctly. |
| 824 | func TestStoreWatchSlowConsumer(t *testing.T) { |
| 825 | s := v2store.New() |
| 826 | s.Watch("/foo", true, true, 0) // stream must be true |
| 827 | // Fill watch channel with 100 events |
| 828 | for i := 1; i <= 100; i++ { |
| 829 | s.Set("/foo", false, fmt.Sprint(i), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}) // ok |
| 830 | } |
| 831 | // assert.Equal(t, s.WatcherHub.count, int64(1)) |
| 832 | s.Set("/foo", false, "101", v2store.TTLOptionSet{ExpireTime: v2store.Permanent}) // ok |
| 833 | // remove watcher |
| 834 | // assert.Equal(t, s.WatcherHub.count, int64(0)) |
| 835 | s.Set("/foo", false, "102", v2store.TTLOptionSet{ExpireTime: v2store.Permanent}) // must not panic |
| 836 | } |
| 837 | |
| 838 | // Performs a non-blocking select on an event channel. |
| 839 | func nbselect(c <-chan *v2store.Event) *v2store.Event { |