Store implements remoteapi.writeStorage interface. TODO(bwplotka): Improve remoteapi.Store API. Right now it's confusing if PRWv1 flows should use WriteResponse or not. If it's not filled, it will be "confirmed zero" which caused partial error reporting on client side in the past. Temporary fix was
(r *http.Request, msgType remoteapi.WriteMessageType)
| 93 | // Temporary fix was done to only care about WriteResponse stats for PRW2 (see https://github.com/prometheus/client_golang/pull/1927 |
| 94 | // but better approach would be to only confirm if explicit stats were injected. |
| 95 | func (h *writeHandler) Store(r *http.Request, msgType remoteapi.WriteMessageType) (*remoteapi.WriteResponse, error) { |
| 96 | // Store receives request with decompressed content in body. |
| 97 | body, err := io.ReadAll(r.Body) |
| 98 | if err != nil { |
| 99 | h.logger.Error("Error reading remote write request body", "err", err.Error()) |
| 100 | return nil, err |
| 101 | } |
| 102 | |
| 103 | wr := remoteapi.NewWriteResponse() |
| 104 | if msgType == remoteapi.WriteV1MessageType { |
| 105 | // PRW 1.0 flow has different proto message and no partial write handling. |
| 106 | var req prompb.WriteRequest |
| 107 | if err := proto.Unmarshal(body, &req); err != nil { |
| 108 | // TODO(bwplotka): Add more context to responded error? |
| 109 | h.logger.Error("Error decoding v1 remote write request", "protobuf_message", msgType, "err", err.Error()) |
| 110 | wr.SetStatusCode(http.StatusBadRequest) |
| 111 | return wr, err |
| 112 | } |
| 113 | if err = h.write(r.Context(), &req); err != nil { |
| 114 | switch { |
| 115 | case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp), errors.Is(err, storage.ErrTooOldSample): |
| 116 | // Indicated an out-of-order sample is a bad request to prevent retries. |
| 117 | wr.SetStatusCode(http.StatusBadRequest) |
| 118 | return wr, err |
| 119 | case isHistogramValidationError(err): |
| 120 | wr.SetStatusCode(http.StatusBadRequest) |
| 121 | return wr, err |
| 122 | default: |
| 123 | wr.SetStatusCode(http.StatusInternalServerError) |
| 124 | return wr, err |
| 125 | } |
| 126 | } |
| 127 | return wr, nil |
| 128 | } |
| 129 | |
| 130 | // Remote Write 2.x proto message handling. |
| 131 | var req writev2.Request |
| 132 | if err := proto.Unmarshal(body, &req); err != nil { |
| 133 | // TODO(bwplotka): Add more context to responded error? |
| 134 | h.logger.Error("Error decoding v2 remote write request", "protobuf_message", msgType, "err", err.Error()) |
| 135 | wr.SetStatusCode(http.StatusBadRequest) |
| 136 | return wr, err |
| 137 | } |
| 138 | |
| 139 | respStats, errHTTPCode, err := h.writeV2(r.Context(), &req) |
| 140 | // Add stats required X-Prometheus-Remote-Write-Written-* response headers. |
| 141 | wr.Add(respStats) |
| 142 | if err != nil { |
| 143 | wr.SetStatusCode(errHTTPCode) |
| 144 | return wr, err |
| 145 | } |
| 146 | return wr, nil |
| 147 | } |
| 148 | |
| 149 | func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) { |
| 150 | outOfOrderExemplarErrs := 0 |