MCPcopy
hub / github.com/NVIDIA/aistore / extractLocalShards

Method extractLocalShards

dsort/dsort.go:276–318  ·  view source on GitHub ↗

extractLocalShards iterates through files local to the current target and calls ExtractShard on matching files based on the given ParsedRequestSpec.

()

Source from the content-addressed store, hash-verified

274// extractLocalShards iterates through files local to the current target and
275// calls ExtractShard on matching files based on the given ParsedRequestSpec.
276func (m *Manager) extractLocalShards() (err error) {
277 phaseInfo := &m.extractionPhase
278
279 phaseInfo.adjuster.start()
280 defer phaseInfo.adjuster.stop()
281
282 // Metrics
283 metrics := m.Metrics.Extraction
284 metrics.begin()
285 defer metrics.finish()
286
287 metrics.mu.Lock()
288 metrics.TotalCnt = m.rs.InputFormat.Template.Count()
289 metrics.mu.Unlock()
290
291 group, ctx := errgroup.WithContext(context.Background())
292 pt := m.rs.InputFormat.Template
293 pt.InitIter()
294
295ExtractAllShards:
296 for name, hasNext := pt.Next(); hasNext; name, hasNext = pt.Next() {
297 select {
298 case <-m.listenAborted():
299 group.Wait()
300 return newDSortAbortedError(m.ManagerUUID)
301 case <-ctx.Done():
302 break ExtractAllShards // context was canceled, therefore we have an error
303 default:
304 }
305
306 phaseInfo.adjuster.acquireGoroutineSema()
307 group.Go(m.extractShard(name, metrics))
308 }
309 if err := group.Wait(); err != nil {
310 return err
311 }
312
313 // We will no longer reserve any memory
314 m.dsorter.postExtraction()
315
316 m.incrementRef(int64(m.recManager.Records.TotalObjectCount()))
317 return nil
318}
319
320func (m *Manager) createShard(s *extract.Shard) (err error) {
321 var (

Callers 1

startMethod · 0.95

Calls 15

listenAbortedMethod · 0.95
extractShardMethod · 0.95
incrementRefMethod · 0.95
newDSortAbortedErrorFunction · 0.85
WithContextMethod · 0.80
InitIterMethod · 0.80
acquireGoroutineSemaMethod · 0.80
TotalObjectCountMethod · 0.80
startMethod · 0.65
LockMethod · 0.65
UnlockMethod · 0.65
WaitMethod · 0.65

Tested by

no test coverage detected