GetCompleteIndexSet returns the set of blobs forming a complete index set up to the provided epoch number.
(ctx context.Context, maxEpoch int)
| 793 | |
| 794 | // GetCompleteIndexSet returns the set of blobs forming a complete index set up to the provided epoch number. |
| 795 | func (e *Manager) GetCompleteIndexSet(ctx context.Context, maxEpoch int) ([]blob.Metadata, time.Time, error) { |
| 796 | for { |
| 797 | cs, err := e.committedState(ctx, 0) |
| 798 | if err != nil { |
| 799 | return nil, time.Time{}, err |
| 800 | } |
| 801 | |
| 802 | if maxEpoch == LatestEpoch { |
| 803 | maxEpoch = cs.WriteEpoch + 1 |
| 804 | } |
| 805 | |
| 806 | result, err := e.getCompleteIndexSetForCommittedState(ctx, cs, 0, maxEpoch) |
| 807 | if e.timeFunc().Before(cs.ValidUntil) { |
| 808 | contentlog.Log4(ctx, e.log, |
| 809 | "complete index set", |
| 810 | logparam.Int("maxEpoch", maxEpoch), |
| 811 | logparam.Int("resultLength", len(result)), |
| 812 | blobparam.BlobIDList("result", blob.IDsFromMetadata(result)), |
| 813 | logparam.Time("deletionWatermark", cs.DeletionWatermark), |
| 814 | ) |
| 815 | |
| 816 | return result, cs.DeletionWatermark, err |
| 817 | } |
| 818 | |
| 819 | // We need to retry if local process took too long (e.g. because the machine went |
| 820 | // to sleep at the wrong moment) and committed state is no longer valid. |
| 821 | // |
| 822 | // One scenario where this matters is cleanup: if determining the set of indexes takes |
| 823 | // too long, it's possible for a cleanup process to delete some of the uncompacted |
| 824 | // indexes that are still treated as authoritative according to old committed state. |
| 825 | // |
| 826 | // Retrying will re-examine the state of the world and re-do the logic. |
| 827 | contentlog.Log(ctx, e.log, "GetCompleteIndexSet took too long, retrying to ensure correctness") |
| 828 | atomic.AddInt32(e.getCompleteIndexSetTooSlow, 1) |
| 829 | } |
| 830 | } |
| 831 | |
| 832 | var errWriteIndexTryAgain = errors.New("try again") |
| 833 |