MCPcopy
hub / github.com/lindb/lindb / DownSamplingMultiSeriesInto

Function DownSamplingMultiSeriesInto

aggregation/down_sampling_agg.go:74–124  ·  view source on GitHub ↗

DownSamplingMultiSeriesInto merges field data from source time range => target time range, data will be merged into DownSamplingResult for example: source range[5,182]=>target range[0,6], ratio:30, source interval:10s, target interval:5min.

(
	target timeutil.SlotRange, ratio uint16, baseSlot uint16,
	fieldType field.Type, decoders []*encoding.TSDDecoder,
	emitValue func(targetPos int, value float64),
)

Source from the content-addressed store, hash-verified

72// data will be merged into DownSamplingResult
73// for example: source range[5,182]=>target range[0,6], ratio:30, source interval:10s, target interval:5min.
74func DownSamplingMultiSeriesInto(
75 target timeutil.SlotRange, ratio uint16, baseSlot uint16,
76 fieldType field.Type, decoders []*encoding.TSDDecoder,
77 emitValue func(targetPos int, value float64),
78) {
79 targetValues := make([]float64, infBlockSize)
80 length := int(target.End-target.Start) + 1
81 if length <= infBlockSize {
82 // on stack
83 targetValues = targetValues[:length]
84 } else {
85 // on heap
86 targetValues = getFloat64Slice(length)
87 defer putFloat64Slice(&targetValues)
88 }
89 // first loop: filled target values with inf value,
90 // inf value is invalid, and won't be emitted after down sampling
91 fillInfBlock(targetValues)
92 bs := int(baseSlot)
93 // second loop: iterating tsd decoder
94 for _, decoder := range decoders {
95 if decoder == nil {
96 continue
97 }
98 for movingSourceSlot := decoder.StartTime(); movingSourceSlot <= decoder.EndTime(); movingSourceSlot++ {
99 if !decoder.HasValueWithSlot(movingSourceSlot) {
100 continue
101 }
102 value := math.Float64frombits(decoder.Value())
103 targetPos := bs + int(movingSourceSlot/ratio) - int(target.Start)
104 if targetPos < 0 {
105 continue
106 }
107 // exhausted
108 if targetPos >= length {
109 break
110 }
111 // not set before
112 if math.IsInf(targetValues[targetPos], 1) {
113 targetValues[targetPos] = value
114 // set before, aggregate
115 } else {
116 targetValues[targetPos] = fieldType.AggType().Aggregate(targetValues[targetPos], value)
117 }
118 }
119 }
120 // third loop, emit down sampling data
121 for offset, value := range targetValues {
122 emitValue(offset, value)
123 }
124}
125
126// DownSampling merges field data from source time range => target time range,
127// for example: source range[5,182]=>target range[0,6], ratio:30, source interval:10s, target interval:5min.

Callers 1

mergeMethod · 0.92

Calls 9

getFloat64SliceFunction · 0.85
putFloat64SliceFunction · 0.85
fillInfBlockFunction · 0.85
StartTimeMethod · 0.80
EndTimeMethod · 0.80
HasValueWithSlotMethod · 0.80
ValueMethod · 0.65
AggregateMethod · 0.65
AggTypeMethod · 0.65

Tested by

no test coverage detected