MCPcopy
hub / github.com/daptin/daptin / TestWebSocketManySubscribers

Function TestWebSocketManySubscribers

websocket_test.go:1027–1133  ·  view source on GitHub ↗

===== SCALABILITY / STRESS TESTS =====

(t *testing.T)

Source from the content-addressed store, hash-verified

1025// ===== SCALABILITY / STRESS TESTS =====
1026
1027func TestWebSocketManySubscribers(t *testing.T) {
1028 ensureServer()
1029 token := signUpAndGetToken(t)
1030
1031 // keep under CI connection limits (2-4 core runners)
1032 const numSubscribers = 20
1033
1034 topicName := fmt.Sprintf("fan-out-%d", time.Now().UnixNano())
1035
1036 // create topic
1037 wsCreator := dialWS(t, token)
1038 sendJSON(t, wsCreator, wsPayload{Id: nextReqId(), Method: "create-topicName", Payload: map[string]interface{}{"name": topicName}})
1039 recvJSON(t, wsCreator, 5*time.Second) // create response
1040 time.Sleep(300 * time.Millisecond)
1041
1042 // connect and subscribe N clients
1043 subscribers := make([]*websocket.Conn, numSubscribers)
1044 var subWg sync.WaitGroup
1045 for i := 0; i < numSubscribers; i++ {
1046 subWg.Add(1)
1047 go func(idx int) {
1048 defer subWg.Done()
1049 config, err := websocket.NewConfig(wsURL, wsBaseAddress)
1050 if err != nil {
1051 t.Errorf("subscriber %d: config error: %v", idx, err)
1052 return
1053 }
1054 config.Header.Set("Authorization", "Bearer "+token)
1055 config.Header.Set("Cookie", "token="+token)
1056 ws, err := websocket.DialConfig(config)
1057 if err != nil {
1058 return
1059 }
1060 // consume session-open
1061 ws.SetReadDeadline(time.Now().Add(5 * time.Second))
1062 var sessionMsg resource.WsOutMessage
1063 if err := websocket.JSON.Receive(ws, &sessionMsg); err != nil {
1064 ws.Close()
1065 return
1066 }
1067 if err := websocket.JSON.Send(ws, wsPayload{Id: nextReqId(), Method: "subscribe", Payload: map[string]interface{}{"topicName": topicName}}); err != nil {
1068 ws.Close()
1069 return
1070 }
1071 ws.SetReadDeadline(time.Now().Add(30 * time.Second))
1072 var ack resource.WsOutMessage
1073 if err := websocket.JSON.Receive(ws, &ack); err != nil {
1074 ws.Close()
1075 return
1076 }
1077 subscribers[idx] = ws
1078 }(i)
1079 }
1080 subWg.Wait()
1081
1082 // count how many actually connected
1083 var connectedCount int
1084 for _, ws := range subscribers {

Callers

nothing calls this directly

Calls 12

ensureServerFunction · 0.85
signUpAndGetTokenFunction · 0.85
dialWSFunction · 0.85
sendJSONFunction · 0.85
nextReqIdFunction · 0.85
recvJSONFunction · 0.85
makeFunction · 0.85
tryRecvJSONFunction · 0.85
AddMethod · 0.80
SetMethod · 0.80
CloseMethod · 0.65
DoneMethod · 0.45

Tested by

no test coverage detected