CommitToDisk commits a transaction to disk. This function only stores deltas to the commit timestamps. It does not try to generate a state. State generation is done via rollups, which happen when a snapshot is created. Don't call this for schema mutations. Directly commit them.
(writer *TxnWriter, commitTs uint64)
| 264 | // State generation is done via rollups, which happen when a snapshot is created. |
| 265 | // Don't call this for schema mutations. Directly commit them. |
| 266 | func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error { |
| 267 | if commitTs == 0 { |
| 268 | return nil |
| 269 | } |
| 270 | |
| 271 | cache := txn.cache |
| 272 | cache.Lock() |
| 273 | defer cache.Unlock() |
| 274 | |
| 275 | var keys []string |
| 276 | for key := range cache.deltas { |
| 277 | keys = append(keys, key) |
| 278 | } |
| 279 | |
| 280 | defer func() { |
| 281 | // Add these keys to be rolled up after we're done writing. This is the right place for them |
| 282 | // to be rolled up, because we just pushed these deltas over to Badger. |
| 283 | for _, key := range keys { |
| 284 | IncrRollup.addKeyToBatch([]byte(key), 1) |
| 285 | } |
| 286 | }() |
| 287 | |
| 288 | var idx int |
| 289 | for idx < len(keys) { |
| 290 | // writer.update can return early from the loop in case we encounter badger.ErrTxnTooBig. On |
| 291 | // that error, writer.update would still commit the transaction and return any error. If |
| 292 | // nil, we continue to process the remaining keys. |
| 293 | err := writer.update(commitTs, func(btxn *badger.Txn) error { |
| 294 | for ; idx < len(keys); idx++ { |
| 295 | key := keys[idx] |
| 296 | data := cache.deltas[key] |
| 297 | if len(data) == 0 { |
| 298 | continue |
| 299 | } |
| 300 | if ts := cache.maxVersions[key]; ts >= commitTs { |
| 301 | // Skip write because we already have a write at a higher ts. |
| 302 | // Logging here can cause a lot of output when doing Raft log replay. So, let's |
| 303 | // not output anything here. |
| 304 | continue |
| 305 | } |
| 306 | err := btxn.SetEntry(&badger.Entry{ |
| 307 | Key: []byte(key), |
| 308 | Value: data, |
| 309 | UserMeta: BitDeltaPosting, |
| 310 | }) |
| 311 | if err != nil { |
| 312 | return err |
| 313 | } |
| 314 | } |
| 315 | return nil |
| 316 | }) |
| 317 | if err != nil { |
| 318 | return err |
| 319 | } |
| 320 | } |
| 321 | return nil |
| 322 | } |
| 323 |