()
| 523 | } |
| 524 | |
| 525 | func (txn *Txn) commitAndSend() (func() error, error) { |
| 526 | orc := txn.db.orc |
| 527 | // Ensure that the order in which we get the commit timestamp is the same as |
| 528 | // the order in which we push these updates to the write channel. So, we |
| 529 | // acquire a writeChLock before getting a commit timestamp, and only release |
| 530 | // it after pushing the entries to it. |
| 531 | orc.writeChLock.Lock() |
| 532 | defer orc.writeChLock.Unlock() |
| 533 | |
| 534 | commitTs := orc.newCommitTs(txn) |
| 535 | // The commitTs can be zero if the transaction is running in managed mode. |
| 536 | // Individual entries might have their own timestamps. |
| 537 | if commitTs == 0 && !txn.db.opt.managedTxns { |
| 538 | return nil, ErrConflict |
| 539 | } |
| 540 | |
| 541 | keepTogether := true |
| 542 | setVersion := func(e *Entry) { |
| 543 | if e.version == 0 { |
| 544 | e.version = commitTs |
| 545 | } else { |
| 546 | keepTogether = false |
| 547 | } |
| 548 | } |
| 549 | for _, e := range txn.pendingWrites { |
| 550 | setVersion(e) |
| 551 | } |
| 552 | // The duplicateWrites slice will be non-empty only if there are duplicate |
| 553 | // entries with different versions. |
| 554 | for _, e := range txn.duplicateWrites { |
| 555 | setVersion(e) |
| 556 | } |
| 557 | |
| 558 | entries := make([]*Entry, 0, len(txn.pendingWrites)+len(txn.duplicateWrites)+1) |
| 559 | |
| 560 | processEntry := func(e *Entry) { |
| 561 | // Suffix the keys with commit ts, so the key versions are sorted in |
| 562 | // descending order of commit timestamp. |
| 563 | e.Key = y.KeyWithTs(e.Key, e.version) |
| 564 | // Add bitTxn only if these entries are part of a transaction. We |
| 565 | // support SetEntryAt(..) in managed mode which means a single |
| 566 | // transaction can have entries with different timestamps. If entries |
| 567 | // in a single transaction have different timestamps, we don't add the |
| 568 | // transaction markers. |
| 569 | if keepTogether { |
| 570 | e.meta |= bitTxn |
| 571 | } |
| 572 | entries = append(entries, e) |
| 573 | } |
| 574 | |
| 575 | // The following debug information is what led to determining the cause of |
| 576 | // bank txn violation bug, and it took a whole bunch of effort to narrow it |
| 577 | // down to here. So, keep this around for at least a couple of months. |
| 578 | // var b strings.Builder |
| 579 | // fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ", |
| 580 | // txn.readTs, commitTs, txn.reads, txn.conflictKeys) |
| 581 | for _, e := range txn.pendingWrites { |
| 582 | processEntry(e) |
no test coverage detected