| 795 | } |
| 796 | |
| 797 | func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) { |
| 798 | if atomic.LoadInt32(&db.blockWrites) == 1 { |
| 799 | return nil, ErrBlockedWrites |
| 800 | } |
| 801 | var count, size int64 |
| 802 | for _, e := range entries { |
| 803 | size += int64(e.estimateSize(db.opt.ValueThreshold)) |
| 804 | count++ |
| 805 | } |
| 806 | if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize { |
| 807 | return nil, ErrTxnTooBig |
| 808 | } |
| 809 | |
| 810 | // We can only service one request because we need each txn to be stored in a contigous section. |
| 811 | // Txns should not interleave among other txns or rewrites. |
| 812 | req := requestPool.Get().(*request) |
| 813 | req.reset() |
| 814 | req.Entries = entries |
| 815 | req.Wg.Add(1) |
| 816 | req.IncrRef() // for db write |
| 817 | db.writeCh <- req // Handled in doWrites. |
| 818 | y.NumPuts.Add(int64(len(entries))) |
| 819 | |
| 820 | return req, nil |
| 821 | } |
| 822 | |
| 823 | func (db *DB) doWrites(lc *y.Closer) { |
| 824 | defer lc.Done() |