MCPcopy Index your code
hub / github.com/nutsdb/nutsdb / TestWatchManager_SubscribeAndSendMessage

Function TestWatchManager_SubscribeAndSendMessage

watch_manager_test.go:205–398  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

203}
204
205func TestWatchManager_SubscribeAndSendMessage(t *testing.T) {
206 t.Run("subscribe and send message", func(t *testing.T) {
207 wm := startDistributor(t)
208 defer func() { _ = wm.close() }()
209
210 bucket := "test"
211 key := "test"
212 value := []byte("updated value")
213
214 receiveChan, _ := wmSubscribe(t, wm, bucket, key)
215 wmSendMessage(t, wm, bucket, key, value)
216
217 msg := wmReceiveMessage(t, receiveChan, bucket, key, 1*time.Second)
218 assert.Equal(t, value, msg.Value)
219 })
220
221 t.Run("subscribe and drop messages", func(t *testing.T) {
222 wm := startDistributor(t)
223 defer func() { _ = wm.close() }()
224
225 bucket := "bucket_test"
226 key := "key_test"
227
228 // Subscribe but don't receive messages
229 receiveChan, _ := wmSubscribe(t, wm, bucket, key)
230
231 // Send enough messages to:
232 // 1. Fill receiveChan buffer (1024)
233 // 2. Trigger 100+ drops to force-unsubscribe
234 // Total: 1100+ messages
235 messagesToSend := 1124
236 wmSendMessages(t, wm, bucket, key, messagesToSend)
237
238 time.Sleep(100 * time.Millisecond)
239 drained := wmDrainChannel(t, receiveChan, 2*time.Second)
240
241 assert.Equal(t, 1024, drained, "should have buffered exactly 1024 messages")
242
243 // Verify subscriber was force-unsubscribed due to slow consumption
244 wmVerifySubscriberRemoved(t, wm, bucket, key)
245 wmVerifyBucketRemoved(t, wm, bucket)
246 })
247
248 t.Run("multiple subscribers for the same key", func(t *testing.T) {
249 wm := startDistributor(t)
250 defer func() { _ = wm.close() }()
251
252 bucket := "bucket_test"
253 key := "key_test"
254 numSubscribers := 10
255 messagesPerSubscriber := 10
256
257 wg := sync.WaitGroup{}
258
259 // start subscribers goroutines for the same key
260 for i := 0; i < numSubscribers; i++ {
261 wg.Add(1)
262 go func(i int) {

Callers

nothing calls this directly

Calls 15

startDistributorFunction · 0.85
wmSubscribeFunction · 0.85
wmSendMessageFunction · 0.85
wmReceiveMessageFunction · 0.85
wmSendMessagesFunction · 0.85
wmDrainChannelFunction · 0.85
wmVerifyBucketRemovedFunction · 0.85
wmReceiveMessagesFunction · 0.85
NewMessageFunction · 0.85
wmStartReceiverFunction · 0.85
wmStartSenderFunction · 0.85

Tested by

no test coverage detected