(operatorConfig OperatorConfig, path string, qParams ...map[string]string)
| 72 | } |
| 73 | |
| 74 | func streamLogs(operatorConfig OperatorConfig, path string, qParams ...map[string]string) error { |
| 75 | interrupt := make(chan os.Signal, 1) |
| 76 | signal.Notify(interrupt, os.Interrupt) |
| 77 | |
| 78 | req, err := operatorRequest(operatorConfig, "GET", path, nil, qParams...) |
| 79 | if err != nil { |
| 80 | return err |
| 81 | } |
| 82 | |
| 83 | values := req.URL.Query() |
| 84 | if operatorConfig.Telemetry { |
| 85 | values.Set("clientID", operatorConfig.ClientID) |
| 86 | } |
| 87 | |
| 88 | req.URL.RawQuery = values.Encode() |
| 89 | wsURL := req.URL.String() |
| 90 | wsURL = strings.Replace(wsURL, "http", "ws", 1) |
| 91 | |
| 92 | header := http.Header{} |
| 93 | header.Set("CortexAPIVersion", consts.CortexVersion) |
| 94 | awsClient, err := aws.New() |
| 95 | if err != nil { |
| 96 | return err |
| 97 | } |
| 98 | |
| 99 | authHeader, err := awsClient.IdentityRequestAsHeader() |
| 100 | if err != nil { |
| 101 | return err |
| 102 | } |
| 103 | header.Set(consts.AuthHeader, authHeader) |
| 104 | |
| 105 | var dialer = websocket.Dialer{ |
| 106 | TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, |
| 107 | } |
| 108 | |
| 109 | connection, response, err := dialer.Dial(wsURL, header) |
| 110 | if err != nil && response == nil { |
| 111 | return ErrorFailedToConnectOperator(err, operatorConfig.EnvName, strings.Replace(operatorConfig.OperatorEndpoint, "http", "ws", 1)) |
| 112 | } |
| 113 | defer response.Body.Close() |
| 114 | |
| 115 | if err != nil { |
| 116 | bodyBytes, err := ioutil.ReadAll(response.Body) |
| 117 | if err != nil || bodyBytes == nil || string(bodyBytes) == "" { |
| 118 | return ErrorFailedToConnectOperator(err, operatorConfig.EnvName, strings.Replace(operatorConfig.OperatorEndpoint, "http", "ws", 1)) |
| 119 | } |
| 120 | var output schema.ErrorResponse |
| 121 | err = json.Unmarshal(bodyBytes, &output) |
| 122 | if err != nil || output.Message == "" { |
| 123 | return ErrorOperatorStreamResponseUnknown(string(bodyBytes), response.StatusCode) |
| 124 | } |
| 125 | return errors.WithStack(&errors.Error{ |
| 126 | Kind: output.Kind, |
| 127 | Message: output.Message, |
| 128 | NoTelemetry: true, |
| 129 | }) |
| 130 | } |
| 131 | defer connection.Close() |
no test coverage detected