(ctx context.Context, db *sql.DB, workerID int, stats *LoadStats)
| 122 | } |
| 123 | |
| 124 | func (c *LoadCommand) worker(ctx context.Context, db *sql.DB, workerID int, stats *LoadStats) { |
| 125 | ticker := time.NewTicker(time.Second / time.Duration(c.WriteRate/c.Workers)) |
| 126 | defer ticker.Stop() |
| 127 | |
| 128 | data := make([]byte, c.PayloadSize) |
| 129 | cryptorand.Read(data) |
| 130 | |
| 131 | for { |
| 132 | select { |
| 133 | case <-ctx.Done(): |
| 134 | return |
| 135 | case <-ticker.C: |
| 136 | // calculateRate() returns a modulation factor (not a multiplier |
| 137 | // on the number of ops). The ticker already fires at the base |
| 138 | // write-rate, so rate is used only to gate whether this tick |
| 139 | // produces an operation: <=0 means skip, <1 means probabilistic, |
| 140 | // >=1 means always fire. |
| 141 | rate := c.calculateRate(stats) |
| 142 | if rate <= 0 || (rate < 1.0 && rand.Float64() > rate) { |
| 143 | continue |
| 144 | } |
| 145 | |
| 146 | if rand.Float64() < c.ReadRatio { |
| 147 | if err := c.performRead(db); err != nil { |
| 148 | atomic.AddInt64(&stats.errors, 1) |
| 149 | slog.Error("Read failed", "error", err) |
| 150 | } else { |
| 151 | atomic.AddInt64(&stats.reads, 1) |
| 152 | } |
| 153 | } else { |
| 154 | if err := c.performWrite(db, data); err != nil { |
| 155 | atomic.AddInt64(&stats.errors, 1) |
| 156 | slog.Error("Write failed", "error", err) |
| 157 | } else { |
| 158 | atomic.AddInt64(&stats.writes, 1) |
| 159 | } |
| 160 | } |
| 161 | } |
| 162 | } |
| 163 | } |
| 164 | |
| 165 | func (c *LoadCommand) calculateRate(stats *LoadStats) float64 { |
| 166 | elapsed := time.Since(stats.startTime).Seconds() |
no test coverage detected