(t *testing.T)
| 1421 | } |
| 1422 | |
| 1423 | func TestWebSocketConcurrentPublishers(t *testing.T) { |
| 1424 | ensureServer() |
| 1425 | token := signUpAndGetToken(t) |
| 1426 | |
| 1427 | const numPublishers = 5 |
| 1428 | const msgsPerPublisher = 50 |
| 1429 | |
| 1430 | topicName := fmt.Sprintf("concurrent-pub-%d", time.Now().UnixNano()) |
| 1431 | |
| 1432 | // create topic |
| 1433 | wsSetup := dialWS(t, token) |
| 1434 | sendJSON(t, wsSetup, wsPayload{Id: nextReqId(), Method: "create-topicName", Payload: map[string]interface{}{"name": topicName}}) |
| 1435 | recvJSON(t, wsSetup, 5*time.Second) // create response |
| 1436 | time.Sleep(300 * time.Millisecond) |
| 1437 | |
| 1438 | // subscriber |
| 1439 | wsSub := dialWS(t, token) |
| 1440 | defer wsSub.Close() |
| 1441 | sendJSON(t, wsSub, wsPayload{Id: nextReqId(), Method: "subscribe", Payload: map[string]interface{}{"topicName": topicName}}) |
| 1442 | recvJSON(t, wsSub, 5*time.Second) // subscribe ack |
| 1443 | |
| 1444 | // launch concurrent publishers |
| 1445 | var wg sync.WaitGroup |
| 1446 | var sendErrors int64 |
| 1447 | start := time.Now() |
| 1448 | for p := 0; p < numPublishers; p++ { |
| 1449 | wg.Add(1) |
| 1450 | go func(pubIdx int) { |
| 1451 | defer wg.Done() |
| 1452 | ws := dialWS(t, token) |
| 1453 | defer ws.Close() |
| 1454 | for m := 0; m < msgsPerPublisher; m++ { |
| 1455 | err := websocket.JSON.Send(ws, wsPayload{ |
| 1456 | Method: "new-message", |
| 1457 | Payload: map[string]interface{}{ |
| 1458 | "topicName": topicName, |
| 1459 | "message": map[string]interface{}{"pub": pubIdx, "seq": m}, |
| 1460 | }, |
| 1461 | }) |
| 1462 | if err != nil { |
| 1463 | atomic.AddInt64(&sendErrors, 1) |
| 1464 | return |
| 1465 | } |
| 1466 | } |
| 1467 | }(p) |
| 1468 | } |
| 1469 | wg.Wait() |
| 1470 | sendDuration := time.Since(start) |
| 1471 | totalSent := int64(numPublishers*msgsPerPublisher) - sendErrors |
| 1472 | t.Logf("sent %d messages from %d publishers in %v (%.0f msg/s), %d send errors", |
| 1473 | totalSent, numPublishers, sendDuration, float64(totalSent)/sendDuration.Seconds(), sendErrors) |
| 1474 | |
| 1475 | // receive what we can |
| 1476 | var recvCount int64 |
| 1477 | recvStart := time.Now() |
| 1478 | for recvCount < totalSent { |
| 1479 | _, ok := tryRecvJSON(wsSub, 5*time.Second) |
| 1480 | if !ok { |
nothing calls this directly
no test coverage detected