(t *testing.T)
| 203 | } |
| 204 | |
| 205 | func TestWatchManager_SubscribeAndSendMessage(t *testing.T) { |
| 206 | t.Run("subscribe and send message", func(t *testing.T) { |
| 207 | wm := startDistributor(t) |
| 208 | defer func() { _ = wm.close() }() |
| 209 | |
| 210 | bucket := "test" |
| 211 | key := "test" |
| 212 | value := []byte("updated value") |
| 213 | |
| 214 | receiveChan, _ := wmSubscribe(t, wm, bucket, key) |
| 215 | wmSendMessage(t, wm, bucket, key, value) |
| 216 | |
| 217 | msg := wmReceiveMessage(t, receiveChan, bucket, key, 1*time.Second) |
| 218 | assert.Equal(t, value, msg.Value) |
| 219 | }) |
| 220 | |
| 221 | t.Run("subscribe and drop messages", func(t *testing.T) { |
| 222 | wm := startDistributor(t) |
| 223 | defer func() { _ = wm.close() }() |
| 224 | |
| 225 | bucket := "bucket_test" |
| 226 | key := "key_test" |
| 227 | |
| 228 | // Subscribe but don't receive messages |
| 229 | receiveChan, _ := wmSubscribe(t, wm, bucket, key) |
| 230 | |
| 231 | // Send enough messages to: |
| 232 | // 1. Fill receiveChan buffer (1024) |
| 233 | // 2. Trigger 100+ drops to force-unsubscribe |
| 234 | // Total: 1100+ messages |
| 235 | messagesToSend := 1124 |
| 236 | wmSendMessages(t, wm, bucket, key, messagesToSend) |
| 237 | |
| 238 | time.Sleep(100 * time.Millisecond) |
| 239 | drained := wmDrainChannel(t, receiveChan, 2*time.Second) |
| 240 | |
| 241 | assert.Equal(t, 1024, drained, "should have buffered exactly 1024 messages") |
| 242 | |
| 243 | // Verify subscriber was force-unsubscribed due to slow consumption |
| 244 | wmVerifySubscriberRemoved(t, wm, bucket, key) |
| 245 | wmVerifyBucketRemoved(t, wm, bucket) |
| 246 | }) |
| 247 | |
| 248 | t.Run("multiple subscribers for the same key", func(t *testing.T) { |
| 249 | wm := startDistributor(t) |
| 250 | defer func() { _ = wm.close() }() |
| 251 | |
| 252 | bucket := "bucket_test" |
| 253 | key := "key_test" |
| 254 | numSubscribers := 10 |
| 255 | messagesPerSubscriber := 10 |
| 256 | |
| 257 | wg := sync.WaitGroup{} |
| 258 | |
| 259 | // start subscribers goroutines for the same key |
| 260 | for i := 0; i < numSubscribers; i++ { |
| 261 | wg.Add(1) |
| 262 | go func(i int) { |
nothing calls this directly
no test coverage detected