listObjectsAIS reads object list from all targets, combines, sorts and returns the final list. Excess of object entries from each target is remembered in the buffer (see: `queryBuffers`) so we won't request the same objects again.
(bck *cluster.Bck, lsmsg *apc.ListObjsMsg)
| 1981 | // the final list. Excess of object entries from each target is remembered in the |
| 1982 | // buffer (see: `queryBuffers`) so we won't request the same objects again. |
| 1983 | func (p *proxy) listObjectsAIS(bck *cluster.Bck, lsmsg *apc.ListObjsMsg) (allEntries *cmn.BucketList, err error) { |
| 1984 | var ( |
| 1985 | aisMsg *aisMsg |
| 1986 | args *bcastArgs |
| 1987 | entries []*cmn.BucketEntry |
| 1988 | results sliceResults |
| 1989 | smap = p.owner.smap.get() |
| 1990 | cacheID = cacheReqID{bck: bck.Bucket(), prefix: lsmsg.Prefix} |
| 1991 | token = lsmsg.ContinuationToken |
| 1992 | props = lsmsg.PropsSet() |
| 1993 | hasEnough bool |
| 1994 | flags uint32 |
| 1995 | ) |
| 1996 | if lsmsg.PageSize == 0 { |
| 1997 | lsmsg.PageSize = apc.DefaultListPageSizeAIS |
| 1998 | } |
| 1999 | pageSize := lsmsg.PageSize |
| 2000 | |
| 2001 | // TODO: Before checking cache and buffer we should check if there is another |
| 2002 | // request already in-flight that requests the same page as we do - if yes |
| 2003 | // then we should just patiently wait for the cache to get populated. |
| 2004 | |
| 2005 | if lsmsg.IsFlagSet(apc.UseListObjsCache) { |
| 2006 | entries, hasEnough = p.qm.c.get(cacheID, token, pageSize) |
| 2007 | if hasEnough { |
| 2008 | goto end |
| 2009 | } |
| 2010 | // Request for all the props if (cache should always have all entries). |
| 2011 | lsmsg.AddProps(apc.GetPropsAll...) |
| 2012 | } |
| 2013 | entries, hasEnough = p.qm.b.get(lsmsg.UUID, token, pageSize) |
| 2014 | if hasEnough { |
| 2015 | // We have enough in the buffer to fulfill the request. |
| 2016 | goto endWithCache |
| 2017 | } |
| 2018 | |
| 2019 | // User requested some page but we don't have enough (but we may have part |
| 2020 | // of the full page). Therefore, we must ask targets for page starting from |
| 2021 | // what we have locally, so we don't re-request the objects. |
| 2022 | lsmsg.ContinuationToken = p.qm.b.last(lsmsg.UUID, token) |
| 2023 | |
| 2024 | aisMsg = p.newAmsgActVal(apc.ActList, &lsmsg) |
| 2025 | args = allocBcArgs() |
| 2026 | args.req = cmn.HreqArgs{ |
| 2027 | Method: http.MethodGet, |
| 2028 | Path: apc.URLPathBuckets.Join(bck.Name), |
| 2029 | Query: bck.AddToQuery(nil), |
| 2030 | Body: cos.MustMarshal(aisMsg), |
| 2031 | } |
| 2032 | args.timeout = apc.LongTimeout |
| 2033 | args.smap = smap |
| 2034 | args.cresv = cresBL{} // -> cmn.BucketList |
| 2035 | |
| 2036 | // Combine the results. |
| 2037 | results = p.bcastGroup(args) |
| 2038 | freeBcArgs(args) |
| 2039 | for _, res := range results { |
| 2040 | if res.err != nil { |
no test coverage detected