(ctx context.Context, req *prompb.WriteRequest)
| 147 | } |
| 148 | |
| 149 | func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) { |
| 150 | outOfOrderExemplarErrs := 0 |
| 151 | samplesWithInvalidLabels := 0 |
| 152 | samplesAppended := 0 |
| 153 | |
| 154 | app := &remoteWriteAppender{ |
| 155 | Appender: h.appendable.Appender(ctx), |
| 156 | maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), |
| 157 | } |
| 158 | |
| 159 | defer func() { |
| 160 | if err != nil { |
| 161 | _ = app.Rollback() |
| 162 | return |
| 163 | } |
| 164 | err = app.Commit() |
| 165 | if err != nil { |
| 166 | h.samplesAppendedWithoutMetadata.Add(float64(samplesAppended)) |
| 167 | } |
| 168 | }() |
| 169 | |
| 170 | b := labels.NewScratchBuilder(0) |
| 171 | for _, ts := range req.Timeseries { |
| 172 | ls := ts.ToLabels(&b, nil) |
| 173 | |
| 174 | // TODO(bwplotka): Even as per 1.0 spec, this should be a 400 error, while other samples are |
| 175 | // potentially written. Perhaps unify with fixed writeV2 implementation a bit. |
| 176 | if !ls.Has(labels.MetricName) || !ls.IsValid(model.UTF8Validation) { |
| 177 | h.logger.Warn("Invalid metric names or labels", "got", ls.String()) |
| 178 | samplesWithInvalidLabels++ |
| 179 | continue |
| 180 | } else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate { |
| 181 | h.logger.Warn("Invalid labels for series.", "labels", ls.String(), "duplicated_label", duplicateLabel) |
| 182 | samplesWithInvalidLabels++ |
| 183 | continue |
| 184 | } |
| 185 | |
| 186 | if err := h.appendV1Samples(app, ts.Samples, ls); err != nil { |
| 187 | return err |
| 188 | } |
| 189 | samplesAppended += len(ts.Samples) |
| 190 | |
| 191 | for _, ep := range ts.Exemplars { |
| 192 | e := ep.ToExemplar(&b, nil) |
| 193 | if _, err := app.AppendExemplar(0, ls, e); err != nil { |
| 194 | switch { |
| 195 | case errors.Is(err, storage.ErrOutOfOrderExemplar): |
| 196 | outOfOrderExemplarErrs++ |
| 197 | h.logger.Debug("Out of order exemplar", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e)) |
| 198 | default: |
| 199 | // Since exemplar storage is still experimental, we don't fail the request on ingestion errors |
| 200 | h.logger.Debug("Error while adding exemplar in AppendExemplar", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e), "err", err) |
| 201 | } |
| 202 | } |
| 203 | } |
| 204 | |
| 205 | if err = h.appendV1Histograms(app, ts.Histograms, ls); err != nil { |
| 206 | return err |
no test coverage detected