write is thread-unsafe by design and should not be called concurrently.
(reqs []*request)
| 1405 | |
| 1406 | // write is thread-unsafe by design and should not be called concurrently. |
| 1407 | func (vlog *valueLog) write(reqs []*request) error { |
| 1408 | if vlog.db.opt.InMemory { |
| 1409 | return nil |
| 1410 | } |
| 1411 | // Validate writes before writing to vlog. Because, we don't want to partially write and return |
| 1412 | // an error. |
| 1413 | if err := vlog.validateWrites(reqs); err != nil { |
| 1414 | return err |
| 1415 | } |
| 1416 | |
| 1417 | vlog.filesLock.RLock() |
| 1418 | maxFid := vlog.maxFid |
| 1419 | curlf := vlog.filesMap[maxFid] |
| 1420 | vlog.filesLock.RUnlock() |
| 1421 | |
| 1422 | var buf bytes.Buffer |
| 1423 | flushWrites := func() error { |
| 1424 | if buf.Len() == 0 { |
| 1425 | return nil |
| 1426 | } |
| 1427 | vlog.opt.Debugf("Flushing buffer of size %d to vlog", buf.Len()) |
| 1428 | n, err := curlf.fd.Write(buf.Bytes()) |
| 1429 | if err != nil { |
| 1430 | return errors.Wrapf(err, "Unable to write to value log file: %q", curlf.path) |
| 1431 | } |
| 1432 | buf.Reset() |
| 1433 | y.NumWrites.Add(1) |
| 1434 | y.NumBytesWritten.Add(int64(n)) |
| 1435 | vlog.opt.Debugf("Done") |
| 1436 | atomic.AddUint32(&vlog.writableLogOffset, uint32(n)) |
| 1437 | atomic.StoreUint32(&curlf.size, vlog.writableLogOffset) |
| 1438 | return nil |
| 1439 | } |
| 1440 | toDisk := func() error { |
| 1441 | if err := flushWrites(); err != nil { |
| 1442 | return err |
| 1443 | } |
| 1444 | if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) || |
| 1445 | vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries { |
| 1446 | if err := curlf.doneWriting(vlog.woffset()); err != nil { |
| 1447 | return err |
| 1448 | } |
| 1449 | |
| 1450 | newid := vlog.maxFid + 1 |
| 1451 | y.AssertTruef(newid > 0, "newid has overflown uint32: %v", newid) |
| 1452 | newlf, err := vlog.createVlogFile(newid) |
| 1453 | if err != nil { |
| 1454 | return err |
| 1455 | } |
| 1456 | curlf = newlf |
| 1457 | atomic.AddInt32(&vlog.db.logRotates, 1) |
| 1458 | } |
| 1459 | return nil |
| 1460 | } |
| 1461 | for i := range reqs { |
| 1462 | b := reqs[i] |
| 1463 | b.Ptrs = b.Ptrs[:0] |
| 1464 | var written int |