MCPcopy
hub / github.com/prometheus/prometheus / Store

Method Store

storage/remote/queue_manager_test.go:1068–1152  ·  view source on GitHub ↗
(_ context.Context, req []byte, _ int)

Source from the content-addressed store, hash-verified

1066}
1067
1068func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResponseStats, error) {
1069 c.mtx.Lock()
1070 defer c.mtx.Unlock()
1071 if c.storeWait > 0 {
1072 time.Sleep(c.storeWait)
1073 }
1074 if c.returnError != nil {
1075 return WriteResponseStats{}, c.returnError
1076 }
1077
1078 reqBuf, err := compression.Decode(compression.Snappy, req, nil)
1079 if err != nil {
1080 return WriteResponseStats{}, err
1081 }
1082
1083 // Check if we've been told to inject err for this call.
1084 if len(c.injectedErrs) > 0 {
1085 c.currErr++
1086 if err = c.injectedErrs[c.currErr]; err != nil {
1087 return WriteResponseStats{}, err
1088 }
1089 }
1090
1091 var (
1092 reqProto *prompb.WriteRequest
1093 reqProtoV2 *writev2.Request
1094 )
1095 switch c.protoMsg {
1096 case remoteapi.WriteV1MessageType:
1097 reqProto = &prompb.WriteRequest{}
1098 err = proto.Unmarshal(reqBuf, reqProto)
1099 case remoteapi.WriteV2MessageType:
1100 // NOTE(bwplotka): v1 msg can be unmarshaled to v2 sometimes, without
1101 // errors.
1102 reqProtoV2 = &writev2.Request{}
1103 err = proto.Unmarshal(reqBuf, reqProtoV2)
1104 if err == nil {
1105 reqProto, err = v2RequestToWriteRequest(reqProtoV2)
1106 }
1107 }
1108 if err != nil {
1109 return WriteResponseStats{}, err
1110 }
1111
1112 rs := WriteResponseStats{}
1113 b := labels.NewScratchBuilder(0)
1114 for i, ts := range reqProto.Timeseries {
1115 labels := ts.ToLabels(&b, nil)
1116 tsID := labels.String()
1117 for j, s := range ts.Samples {
1118 st := int64(0)
1119 if reqProtoV2 != nil {
1120 // TODO(bwplotka): Refactor queue manager TestWriteClient for tighter validation
1121 // and native support for new RW2 features. For now we inject STs in RW2 case to the existing test suite.
1122 st = reqProtoV2.Timeseries[i].Samples[j].StartTimestamp
1123 }
1124 c.receivedSamples[tsID] = append(c.receivedSamples[tsID], writev2.Sample{
1125 StartTimestamp: st,

Callers

nothing calls this directly

Calls 8

DecodeFunction · 0.92
NewScratchBuilderFunction · 0.92
v2RequestToWriteRequestFunction · 0.85
LockMethod · 0.80
StringMethod · 0.65
UnmarshalMethod · 0.45
ToLabelsMethod · 0.45
IsFloatHistogramMethod · 0.45

Tested by

no test coverage detected