| 169 | } |
| 170 | |
| 171 | func (dm *DMap) atomicIncrByFloat(e *env, delta float64) (float64, error) { |
| 172 | atomicKey := e.dmap + e.key |
| 173 | dm.s.locker.Lock(atomicKey) |
| 174 | defer func() { |
| 175 | err := dm.s.locker.Unlock(atomicKey) |
| 176 | if err != nil { |
| 177 | dm.s.log.V(3).Printf("[ERROR] Failed to release the fine grained lock for key: %s on DMap: %s: %v", e.key, e.dmap, err) |
| 178 | } |
| 179 | }() |
| 180 | |
| 181 | var current float64 |
| 182 | entry, err := dm.Get(e.ctx, e.key) |
| 183 | if errors.Is(err, ErrKeyNotFound) { |
| 184 | err = nil |
| 185 | } |
| 186 | if err != nil { |
| 187 | return 0, err |
| 188 | } |
| 189 | |
| 190 | if entry != nil { |
| 191 | current, err = util.ParseFloat(entry.Value(), 64) |
| 192 | if err != nil { |
| 193 | return 0, err |
| 194 | } |
| 195 | } |
| 196 | |
| 197 | latest := current + delta |
| 198 | if err != nil { |
| 199 | return 0, err |
| 200 | } |
| 201 | |
| 202 | valueBuf := pool.Get() |
| 203 | defer pool.Put(valueBuf) |
| 204 | |
| 205 | enc := resp.New(valueBuf) |
| 206 | err = enc.Encode(latest) |
| 207 | if err != nil { |
| 208 | return 0, err |
| 209 | } |
| 210 | e.value = valueBuf.Bytes() |
| 211 | e.value = make([]byte, valueBuf.Len()) |
| 212 | copy(e.value, valueBuf.Bytes()) |
| 213 | |
| 214 | err = dm.put(e) |
| 215 | if err != nil { |
| 216 | return 0, err |
| 217 | } |
| 218 | |
| 219 | return latest, nil |
| 220 | } |
| 221 | |
| 222 | // IncrByFloat atomically increments key by delta. The return value is the new value after being incremented or an error. |
| 223 | func (dm *DMap) IncrByFloat(ctx context.Context, key string, delta float64) (float64, error) { |