MCPcopy
hub / github.com/benbjohnson/litestream / worker

Method worker

cmd/litestream-test/load.go:124–163  ·  view source on GitHub ↗
(ctx context.Context, db *sql.DB, workerID int, stats *LoadStats)

Source from the content-addressed store, hash-verified

122}
123
124func (c *LoadCommand) worker(ctx context.Context, db *sql.DB, workerID int, stats *LoadStats) {
125 ticker := time.NewTicker(time.Second / time.Duration(c.WriteRate/c.Workers))
126 defer ticker.Stop()
127
128 data := make([]byte, c.PayloadSize)
129 cryptorand.Read(data)
130
131 for {
132 select {
133 case <-ctx.Done():
134 return
135 case <-ticker.C:
136 // calculateRate() returns a modulation factor (not a multiplier
137 // on the number of ops). The ticker already fires at the base
138 // write-rate, so rate is used only to gate whether this tick
139 // produces an operation: <=0 means skip, <1 means probabilistic,
140 // >=1 means always fire.
141 rate := c.calculateRate(stats)
142 if rate <= 0 || (rate < 1.0 && rand.Float64() > rate) {
143 continue
144 }
145
146 if rand.Float64() < c.ReadRatio {
147 if err := c.performRead(db); err != nil {
148 atomic.AddInt64(&stats.errors, 1)
149 slog.Error("Read failed", "error", err)
150 } else {
151 atomic.AddInt64(&stats.reads, 1)
152 }
153 } else {
154 if err := c.performWrite(db, data); err != nil {
155 atomic.AddInt64(&stats.errors, 1)
156 slog.Error("Write failed", "error", err)
157 } else {
158 atomic.AddInt64(&stats.writes, 1)
159 }
160 }
161 }
162 }
163}
164
165func (c *LoadCommand) calculateRate(stats *LoadStats) float64 {
166 elapsed := time.Since(stats.startTime).Seconds()

Callers 1

generateLoadMethod · 0.95

Calls 6

calculateRateMethod · 0.95
performReadMethod · 0.95
performWriteMethod · 0.95
StopMethod · 0.80
ReadMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected