MCPcopy
hub / github.com/prometheus/prometheus / Store

Method Store

storage/remote/write_handler.go:95–147  ·  view source on GitHub ↗

Store implements remoteapi.writeStorage interface. TODO(bwplotka): Improve remoteapi.Store API. Right now it's confusing if PRWv1 flows should use WriteResponse or not. If it's not filled, it will be "confirmed zero" which caused partial error reporting on client side in the past. Temporary fix was

(r *http.Request, msgType remoteapi.WriteMessageType)

Source from the content-addressed store, hash-verified

93// Temporary fix was done to only care about WriteResponse stats for PRW2 (see https://github.com/prometheus/client_golang/pull/1927
94// but better approach would be to only confirm if explicit stats were injected.
95func (h *writeHandler) Store(r *http.Request, msgType remoteapi.WriteMessageType) (*remoteapi.WriteResponse, error) {
96 // Store receives request with decompressed content in body.
97 body, err := io.ReadAll(r.Body)
98 if err != nil {
99 h.logger.Error("Error reading remote write request body", "err", err.Error())
100 return nil, err
101 }
102
103 wr := remoteapi.NewWriteResponse()
104 if msgType == remoteapi.WriteV1MessageType {
105 // PRW 1.0 flow has different proto message and no partial write handling.
106 var req prompb.WriteRequest
107 if err := proto.Unmarshal(body, &req); err != nil {
108 // TODO(bwplotka): Add more context to responded error?
109 h.logger.Error("Error decoding v1 remote write request", "protobuf_message", msgType, "err", err.Error())
110 wr.SetStatusCode(http.StatusBadRequest)
111 return wr, err
112 }
113 if err = h.write(r.Context(), &req); err != nil {
114 switch {
115 case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp), errors.Is(err, storage.ErrTooOldSample):
116 // Indicated an out-of-order sample is a bad request to prevent retries.
117 wr.SetStatusCode(http.StatusBadRequest)
118 return wr, err
119 case isHistogramValidationError(err):
120 wr.SetStatusCode(http.StatusBadRequest)
121 return wr, err
122 default:
123 wr.SetStatusCode(http.StatusInternalServerError)
124 return wr, err
125 }
126 }
127 return wr, nil
128 }
129
130 // Remote Write 2.x proto message handling.
131 var req writev2.Request
132 if err := proto.Unmarshal(body, &req); err != nil {
133 // TODO(bwplotka): Add more context to responded error?
134 h.logger.Error("Error decoding v2 remote write request", "protobuf_message", msgType, "err", err.Error())
135 wr.SetStatusCode(http.StatusBadRequest)
136 return wr, err
137 }
138
139 respStats, errHTTPCode, err := h.writeV2(r.Context(), &req)
140 // Add stats required X-Prometheus-Remote-Write-Written-* response headers.
141 wr.Add(respStats)
142 if err != nil {
143 wr.SetStatusCode(errHTTPCode)
144 return wr, err
145 }
146 return wr, nil
147}
148
149func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) {
150 outOfOrderExemplarErrs := 0

Callers

nothing calls this directly

Calls 8

writeMethod · 0.95
writeV2Method · 0.95
ErrorMethod · 0.65
AddMethod · 0.65
UnmarshalMethod · 0.45
ContextMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected