()
| 1589 | } |
| 1590 | |
| 1591 | func (apfl *AsyncPagesFileLoad) main() { |
| 1592 | defer func() { |
| 1593 | // Close ar first since this synchronously stops inflight I/O. |
| 1594 | if err := apfl.ar.Close(); err != nil { |
| 1595 | // Completed reads are complete irrespective of err, so log err |
| 1596 | // rather than propagating it. |
| 1597 | log.Warningf("Async page loading: stateio.AsyncReader.Close failed: %v", err) |
| 1598 | } |
| 1599 | apfl.timeline.End() |
| 1600 | // Wake up any remaining waiters so that they can observe apfl.err(). |
| 1601 | // Leave all segments in asyncMemoryFileLoad.unloaded so that new |
| 1602 | // callers of awaitLoad() will still observe the correct (permanently |
| 1603 | // unloaded) segments. |
| 1604 | apfl.amflsMu.Lock() |
| 1605 | apfl.mu.Lock() |
| 1606 | for amfl := apfl.amfls.Front(); amfl != nil; amfl = amfl.Next() { |
| 1607 | for ulseg := amfl.unloaded.FirstSegment(); ulseg.Ok(); ulseg = ulseg.NextSegment() { |
| 1608 | ul := ulseg.ValuePtr() |
| 1609 | ullen := ulseg.Range().Length() |
| 1610 | for _, w := range ul.waiters { |
| 1611 | w.pending -= ullen |
| 1612 | if w.pending == 0 { |
| 1613 | w.wakeup.Notify(1) |
| 1614 | } |
| 1615 | } |
| 1616 | ul.started = false |
| 1617 | ul.waiters = nil |
| 1618 | } |
| 1619 | if amfl.doneCallback != nil { |
| 1620 | amfl.doneCallback(apfl.err()) |
| 1621 | amfl.doneCallback = nil |
| 1622 | } |
| 1623 | } |
| 1624 | apfl.mu.Unlock() |
| 1625 | apfl.amflsMu.Unlock() |
| 1626 | if apfl.doneCallback != nil { |
| 1627 | apfl.doneCallback(apfl.err()) |
| 1628 | } |
| 1629 | }() |
| 1630 | |
| 1631 | maxParallel := apfl.ar.MaxParallel() |
| 1632 | // Storage reused between main loop iterations: |
| 1633 | var completions []stateio.Completion |
| 1634 | var wakeups []*aplWaiter |
| 1635 | var decRefs []aplFileRange |
| 1636 | |
| 1637 | dropDelayedDecRefs := func() { |
| 1638 | if len(decRefs) != 0 { |
| 1639 | for _, fr := range decRefs { |
| 1640 | fr.amfl.f.DecRef(fr.FileRange) |
| 1641 | } |
| 1642 | decRefs = decRefs[:0] |
| 1643 | } |
| 1644 | } |
| 1645 | defer dropDelayedDecRefs() |
| 1646 | |
| 1647 | // Don't start timing until we have pages to load. |
| 1648 | apfl.lfStatus.Wait() |
no test coverage detected