| 118 | } |
| 119 | |
| 120 | func (wb *WriteBatch) commit() error { |
| 121 | if err := wb.Error(); err != nil { |
| 122 | return err |
| 123 | } |
| 124 | |
| 125 | if wb.finished { |
| 126 | return ErrCommitAfterFinish |
| 127 | } |
| 128 | |
| 129 | if err := wb.throttle.Do(); err != nil { |
| 130 | wb.err.Store(err) |
| 131 | return err |
| 132 | } |
| 133 | |
| 134 | // Record the current tx ID before async commit |
| 135 | committingTxID := wb.txID |
| 136 | |
| 137 | // Async commit, callback will run in goroutine |
| 138 | wb.tx.CommitWith(func(cbErr error) { |
| 139 | defer wb.throttle.Done(cbErr) |
| 140 | if cbErr != nil { |
| 141 | wb.err.Store(cbErr) |
| 142 | } |
| 143 | }) |
| 144 | |
| 145 | // Create new tx for next batch operations via TransactionManager |
| 146 | // needLock=false because Put()/Delete() will acquire lock before operations |
| 147 | var err error |
| 148 | wb.tx, err = wb.db.transactionMgr.BeginTx(true, false) |
| 149 | if err != nil { |
| 150 | // Even if we cannot open a new transaction (e.g., during shutdown), |
| 151 | // ensure the committing transaction is unregistered so shutdown logic |
| 152 | // does not wait on it forever. |
| 153 | wb.db.transactionMgr.UnregisterTx(committingTxID) |
| 154 | return err |
| 155 | } |
| 156 | wb.txID = wb.tx.id |
| 157 | |
| 158 | // Unregister the committed transaction |
| 159 | wb.db.transactionMgr.UnregisterTx(committingTxID) |
| 160 | |
| 161 | return wb.Error() |
| 162 | } |
| 163 | |
| 164 | func (wb *WriteBatch) Flush() error { |
| 165 | wb.Lock() |