(name string, metrics *LocalExtraction)
| 158 | } |
| 159 | |
| 160 | func (m *Manager) extractShard(name string, metrics *LocalExtraction) func() error { |
| 161 | return func() error { |
| 162 | var ( |
| 163 | warnPossibleOOM bool |
| 164 | estimateTotalRecordsSize uint64 |
| 165 | phaseInfo = &m.extractionPhase |
| 166 | ) |
| 167 | |
| 168 | defer phaseInfo.adjuster.releaseGoroutineSema() |
| 169 | |
| 170 | shardName := name + m.rs.Extension |
| 171 | lom := cluster.AllocLOM(shardName) |
| 172 | defer cluster.FreeLOM(lom) |
| 173 | if err := lom.InitBck(&m.rs.Bck); err != nil { |
| 174 | return err |
| 175 | } |
| 176 | _, local, err := lom.HrwTarget(m.smap) |
| 177 | if err != nil { |
| 178 | return err |
| 179 | } |
| 180 | if !local { |
| 181 | return nil |
| 182 | } |
| 183 | if err = lom.Load(false /*cache it*/, false /*locked*/); err != nil { |
| 184 | if cmn.IsErrObjNought(err) { |
| 185 | msg := fmt.Sprintf("shard %q does not exist (is missing)", shardName) |
| 186 | return m.react(m.rs.MissingShards, msg) |
| 187 | } |
| 188 | return err |
| 189 | } |
| 190 | |
| 191 | phaseInfo.adjuster.acquireSema(lom.MpathInfo()) |
| 192 | if m.aborted() { |
| 193 | phaseInfo.adjuster.releaseSema(lom.MpathInfo()) |
| 194 | return newDSortAbortedError(m.ManagerUUID) |
| 195 | } |
| 196 | // |
| 197 | // FIXME: check capacity *prior* to starting |
| 198 | // |
| 199 | if cs := fs.GetCapStatus(); cs.Err != nil { |
| 200 | phaseInfo.adjuster.releaseSema(lom.MpathInfo()) |
| 201 | return cs.Err |
| 202 | } |
| 203 | |
| 204 | lom.Lock(false) |
| 205 | f, err := os.Open(lom.FQN) |
| 206 | if err != nil { |
| 207 | phaseInfo.adjuster.releaseSema(lom.MpathInfo()) |
| 208 | lom.Unlock(false) |
| 209 | return errors.Errorf("unable to open local file, err: %v", err) |
| 210 | } |
| 211 | var compressedSize int64 |
| 212 | if m.extractCreator.UsingCompression() { |
| 213 | compressedSize = lom.SizeBytes() |
| 214 | } |
| 215 | |
| 216 | expectedUncompressedSize := uint64(float64(lom.SizeBytes()) / m.avgCompressionRatio()) |
| 217 | toDisk := m.dsorter.preShardExtraction(expectedUncompressedSize) |
no test coverage detected