MCPcopy
hub / github.com/daptin/daptin / TestWebSocketConcurrentPublishers

Function TestWebSocketConcurrentPublishers

websocket_test.go:1423–1495  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1421}
1422
1423func TestWebSocketConcurrentPublishers(t *testing.T) {
1424 ensureServer()
1425 token := signUpAndGetToken(t)
1426
1427 const numPublishers = 5
1428 const msgsPerPublisher = 50
1429
1430 topicName := fmt.Sprintf("concurrent-pub-%d", time.Now().UnixNano())
1431
1432 // create topic
1433 wsSetup := dialWS(t, token)
1434 sendJSON(t, wsSetup, wsPayload{Id: nextReqId(), Method: "create-topicName", Payload: map[string]interface{}{"name": topicName}})
1435 recvJSON(t, wsSetup, 5*time.Second) // create response
1436 time.Sleep(300 * time.Millisecond)
1437
1438 // subscriber
1439 wsSub := dialWS(t, token)
1440 defer wsSub.Close()
1441 sendJSON(t, wsSub, wsPayload{Id: nextReqId(), Method: "subscribe", Payload: map[string]interface{}{"topicName": topicName}})
1442 recvJSON(t, wsSub, 5*time.Second) // subscribe ack
1443
1444 // launch concurrent publishers
1445 var wg sync.WaitGroup
1446 var sendErrors int64
1447 start := time.Now()
1448 for p := 0; p < numPublishers; p++ {
1449 wg.Add(1)
1450 go func(pubIdx int) {
1451 defer wg.Done()
1452 ws := dialWS(t, token)
1453 defer ws.Close()
1454 for m := 0; m < msgsPerPublisher; m++ {
1455 err := websocket.JSON.Send(ws, wsPayload{
1456 Method: "new-message",
1457 Payload: map[string]interface{}{
1458 "topicName": topicName,
1459 "message": map[string]interface{}{"pub": pubIdx, "seq": m},
1460 },
1461 })
1462 if err != nil {
1463 atomic.AddInt64(&sendErrors, 1)
1464 return
1465 }
1466 }
1467 }(p)
1468 }
1469 wg.Wait()
1470 sendDuration := time.Since(start)
1471 totalSent := int64(numPublishers*msgsPerPublisher) - sendErrors
1472 t.Logf("sent %d messages from %d publishers in %v (%.0f msg/s), %d send errors",
1473 totalSent, numPublishers, sendDuration, float64(totalSent)/sendDuration.Seconds(), sendErrors)
1474
1475 // receive what we can
1476 var recvCount int64
1477 recvStart := time.Now()
1478 for recvCount < totalSent {
1479 _, ok := tryRecvJSON(wsSub, 5*time.Second)
1480 if !ok {

Callers

nothing calls this directly

Calls 10

ensureServerFunction · 0.85
signUpAndGetTokenFunction · 0.85
dialWSFunction · 0.85
sendJSONFunction · 0.85
nextReqIdFunction · 0.85
recvJSONFunction · 0.85
tryRecvJSONFunction · 0.85
AddMethod · 0.80
CloseMethod · 0.65
DoneMethod · 0.45

Tested by

no test coverage detected