(_ context.Context, req []byte, _ int)
| 1066 | } |
| 1067 | |
| 1068 | func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) (WriteResponseStats, error) { |
| 1069 | c.mtx.Lock() |
| 1070 | defer c.mtx.Unlock() |
| 1071 | if c.storeWait > 0 { |
| 1072 | time.Sleep(c.storeWait) |
| 1073 | } |
| 1074 | if c.returnError != nil { |
| 1075 | return WriteResponseStats{}, c.returnError |
| 1076 | } |
| 1077 | |
| 1078 | reqBuf, err := compression.Decode(compression.Snappy, req, nil) |
| 1079 | if err != nil { |
| 1080 | return WriteResponseStats{}, err |
| 1081 | } |
| 1082 | |
| 1083 | // Check if we've been told to inject err for this call. |
| 1084 | if len(c.injectedErrs) > 0 { |
| 1085 | c.currErr++ |
| 1086 | if err = c.injectedErrs[c.currErr]; err != nil { |
| 1087 | return WriteResponseStats{}, err |
| 1088 | } |
| 1089 | } |
| 1090 | |
| 1091 | var ( |
| 1092 | reqProto *prompb.WriteRequest |
| 1093 | reqProtoV2 *writev2.Request |
| 1094 | ) |
| 1095 | switch c.protoMsg { |
| 1096 | case remoteapi.WriteV1MessageType: |
| 1097 | reqProto = &prompb.WriteRequest{} |
| 1098 | err = proto.Unmarshal(reqBuf, reqProto) |
| 1099 | case remoteapi.WriteV2MessageType: |
| 1100 | // NOTE(bwplotka): v1 msg can be unmarshaled to v2 sometimes, without |
| 1101 | // errors. |
| 1102 | reqProtoV2 = &writev2.Request{} |
| 1103 | err = proto.Unmarshal(reqBuf, reqProtoV2) |
| 1104 | if err == nil { |
| 1105 | reqProto, err = v2RequestToWriteRequest(reqProtoV2) |
| 1106 | } |
| 1107 | } |
| 1108 | if err != nil { |
| 1109 | return WriteResponseStats{}, err |
| 1110 | } |
| 1111 | |
| 1112 | rs := WriteResponseStats{} |
| 1113 | b := labels.NewScratchBuilder(0) |
| 1114 | for i, ts := range reqProto.Timeseries { |
| 1115 | labels := ts.ToLabels(&b, nil) |
| 1116 | tsID := labels.String() |
| 1117 | for j, s := range ts.Samples { |
| 1118 | st := int64(0) |
| 1119 | if reqProtoV2 != nil { |
| 1120 | // TODO(bwplotka): Refactor queue manager TestWriteClient for tighter validation |
| 1121 | // and native support for new RW2 features. For now we inject STs in RW2 case to the existing test suite. |
| 1122 | st = reqProtoV2.Timeseries[i].Samples[j].StartTimestamp |
| 1123 | } |
| 1124 | c.receivedSamples[tsID] = append(c.receivedSamples[tsID], writev2.Sample{ |
| 1125 | StartTimestamp: st, |
nothing calls this directly
no test coverage detected