| 178 | } |
| 179 | |
| 180 | func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { |
| 181 | defer func() { |
| 182 | if err == nil { |
| 183 | c.adds.Inc() |
| 184 | } |
| 185 | }() |
| 186 | |
| 187 | c.isRunningMtx.Lock() |
| 188 | defer c.isRunningMtx.Unlock() |
| 189 | |
| 190 | if !c.isRunning { |
| 191 | return errors.New("queue is not running") |
| 192 | } |
| 193 | |
| 194 | c.chunkRefMapMtx.Lock() |
| 195 | c.chunkRefMap[job.ref] = job.chk |
| 196 | |
| 197 | // Keep track of the peak usage of c.chunkRefMap. |
| 198 | if len(c.chunkRefMap) > c.chunkRefMapPeakSize { |
| 199 | c.chunkRefMapPeakSize = len(c.chunkRefMap) |
| 200 | } |
| 201 | c.chunkRefMapMtx.Unlock() |
| 202 | |
| 203 | if ok := c.jobs.push(job); !ok { |
| 204 | c.chunkRefMapMtx.Lock() |
| 205 | delete(c.chunkRefMap, job.ref) |
| 206 | c.chunkRefMapMtx.Unlock() |
| 207 | |
| 208 | return errors.New("queue is closed") |
| 209 | } |
| 210 | |
| 211 | return nil |
| 212 | } |
| 213 | |
| 214 | func (c *chunkWriteQueue) get(ref ChunkDiskMapperRef) chunkenc.Chunk { |
| 215 | c.chunkRefMapMtx.RLock() |