()
| 1879 | } |
| 1880 | |
| 1881 | func (vlog *valueLog) flushDiscardStats() { |
| 1882 | defer vlog.lfDiscardStats.closer.Done() |
| 1883 | |
| 1884 | mergeStats := func(stats map[uint32]int64) ([]byte, error) { |
| 1885 | vlog.lfDiscardStats.Lock() |
| 1886 | defer vlog.lfDiscardStats.Unlock() |
| 1887 | for fid, count := range stats { |
| 1888 | vlog.lfDiscardStats.m[fid] += count |
| 1889 | vlog.lfDiscardStats.updatesSinceFlush++ |
| 1890 | } |
| 1891 | |
| 1892 | if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold { |
| 1893 | encodedDS, err := json.Marshal(vlog.lfDiscardStats.m) |
| 1894 | if err != nil { |
| 1895 | return nil, err |
| 1896 | } |
| 1897 | vlog.lfDiscardStats.updatesSinceFlush = 0 |
| 1898 | return encodedDS, nil |
| 1899 | } |
| 1900 | return nil, nil |
| 1901 | } |
| 1902 | |
| 1903 | process := func(stats map[uint32]int64) error { |
| 1904 | encodedDS, err := mergeStats(stats) |
| 1905 | if err != nil || encodedDS == nil { |
| 1906 | return err |
| 1907 | } |
| 1908 | |
| 1909 | entries := []*Entry{{ |
| 1910 | Key: y.KeyWithTs(lfDiscardStatsKey, 1), |
| 1911 | Value: encodedDS, |
| 1912 | }} |
| 1913 | req, err := vlog.db.sendToWriteCh(entries) |
| 1914 | // No special handling of ErrBlockedWrites is required as err is just logged in |
| 1915 | // for loop below. |
| 1916 | if err != nil { |
| 1917 | return errors.Wrapf(err, "failed to push discard stats to write channel") |
| 1918 | } |
| 1919 | return req.Wait() |
| 1920 | } |
| 1921 | |
| 1922 | closer := vlog.lfDiscardStats.closer |
| 1923 | for { |
| 1924 | select { |
| 1925 | case <-closer.HasBeenClosed(): |
| 1926 | // For simplicity just return without processing already present in stats in flushChan. |
| 1927 | return |
| 1928 | case stats := <-vlog.lfDiscardStats.flushChan: |
| 1929 | if err := process(stats); err != nil { |
| 1930 | vlog.opt.Errorf("unable to process discardstats with error: %s", err) |
| 1931 | } |
| 1932 | } |
| 1933 | } |
| 1934 | } |
| 1935 | |
| 1936 | // populateDiscardStats populates vlog.lfDiscardStats. |
| 1937 | // This function will be called while initializing valueLog. |
no test coverage detected