extractLocalShards iterates through files local to the current target and calls ExtractShard on matching files based on the given ParsedRequestSpec.
()
| 274 | // extractLocalShards iterates through files local to the current target and |
| 275 | // calls ExtractShard on matching files based on the given ParsedRequestSpec. |
| 276 | func (m *Manager) extractLocalShards() (err error) { |
| 277 | phaseInfo := &m.extractionPhase |
| 278 | |
| 279 | phaseInfo.adjuster.start() |
| 280 | defer phaseInfo.adjuster.stop() |
| 281 | |
| 282 | // Metrics |
| 283 | metrics := m.Metrics.Extraction |
| 284 | metrics.begin() |
| 285 | defer metrics.finish() |
| 286 | |
| 287 | metrics.mu.Lock() |
| 288 | metrics.TotalCnt = m.rs.InputFormat.Template.Count() |
| 289 | metrics.mu.Unlock() |
| 290 | |
| 291 | group, ctx := errgroup.WithContext(context.Background()) |
| 292 | pt := m.rs.InputFormat.Template |
| 293 | pt.InitIter() |
| 294 | |
| 295 | ExtractAllShards: |
| 296 | for name, hasNext := pt.Next(); hasNext; name, hasNext = pt.Next() { |
| 297 | select { |
| 298 | case <-m.listenAborted(): |
| 299 | group.Wait() |
| 300 | return newDSortAbortedError(m.ManagerUUID) |
| 301 | case <-ctx.Done(): |
| 302 | break ExtractAllShards // context was canceled, therefore we have an error |
| 303 | default: |
| 304 | } |
| 305 | |
| 306 | phaseInfo.adjuster.acquireGoroutineSema() |
| 307 | group.Go(m.extractShard(name, metrics)) |
| 308 | } |
| 309 | if err := group.Wait(); err != nil { |
| 310 | return err |
| 311 | } |
| 312 | |
| 313 | // We will no longer reserve any memory |
| 314 | m.dsorter.postExtraction() |
| 315 | |
| 316 | m.incrementRef(int64(m.recManager.Records.TotalObjectCount())) |
| 317 | return nil |
| 318 | } |
| 319 | |
| 320 | func (m *Manager) createShard(s *extract.Shard) (err error) { |
| 321 | var ( |
no test coverage detected