MCPcopy
hub / github.com/dgraph-io/badger / commitAndSend

Method commitAndSend

txn.go:525–613  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

523}
524
525func (txn *Txn) commitAndSend() (func() error, error) {
526 orc := txn.db.orc
527 // Ensure that the order in which we get the commit timestamp is the same as
528 // the order in which we push these updates to the write channel. So, we
529 // acquire a writeChLock before getting a commit timestamp, and only release
530 // it after pushing the entries to it.
531 orc.writeChLock.Lock()
532 defer orc.writeChLock.Unlock()
533
534 commitTs := orc.newCommitTs(txn)
535 // The commitTs can be zero if the transaction is running in managed mode.
536 // Individual entries might have their own timestamps.
537 if commitTs == 0 && !txn.db.opt.managedTxns {
538 return nil, ErrConflict
539 }
540
541 keepTogether := true
542 setVersion := func(e *Entry) {
543 if e.version == 0 {
544 e.version = commitTs
545 } else {
546 keepTogether = false
547 }
548 }
549 for _, e := range txn.pendingWrites {
550 setVersion(e)
551 }
552 // The duplicateWrites slice will be non-empty only if there are duplicate
553 // entries with different versions.
554 for _, e := range txn.duplicateWrites {
555 setVersion(e)
556 }
557
558 entries := make([]*Entry, 0, len(txn.pendingWrites)+len(txn.duplicateWrites)+1)
559
560 processEntry := func(e *Entry) {
561 // Suffix the keys with commit ts, so the key versions are sorted in
562 // descending order of commit timestamp.
563 e.Key = y.KeyWithTs(e.Key, e.version)
564 // Add bitTxn only if these entries are part of a transaction. We
565 // support SetEntryAt(..) in managed mode which means a single
566 // transaction can have entries with different timestamps. If entries
567 // in a single transaction have different timestamps, we don't add the
568 // transaction markers.
569 if keepTogether {
570 e.meta |= bitTxn
571 }
572 entries = append(entries, e)
573 }
574
575 // The following debug information is what led to determining the cause of
576 // bank txn violation bug, and it took a whole bunch of effort to narrow it
577 // down to here. So, keep this around for at least a couple of months.
578 // var b strings.Builder
579 // fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
580 // txn.readTs, commitTs, txn.reads, txn.conflictKeys)
581 for _, e := range txn.pendingWrites {
582 processEntry(e)

Callers 2

CommitMethod · 0.95
CommitWithMethod · 0.95

Calls 6

KeyWithTsFunction · 0.92
AssertTrueFunction · 0.92
newCommitTsMethod · 0.80
sendToWriteChMethod · 0.80
doneCommitMethod · 0.80
WaitMethod · 0.45

Tested by

no test coverage detected