(ctx context.Context, arg funcArgs)
| 1621 | } |
| 1622 | |
| 1623 | func (qs *queryState) filterGeoFunction(ctx context.Context, arg funcArgs) error { |
| 1624 | span := trace.SpanFromContext(ctx) |
| 1625 | stop := x.SpanTimer(span, "filterGeoFunction") |
| 1626 | defer stop() |
| 1627 | |
| 1628 | attr := arg.q.Attr |
| 1629 | uids := algo.MergeSorted(arg.out.UidMatrix) |
| 1630 | numGo, width := x.DivideAndRule(len(uids.Uids)) |
| 1631 | span.AddEvent("Parallel processing details", trace.WithAttributes( |
| 1632 | attribute.Int("uid_count", len(uids.Uids)), |
| 1633 | attribute.Int("num_go", numGo), |
| 1634 | attribute.Int("width", width))) |
| 1635 | |
| 1636 | eg, egCtx := errgroup.WithContext(ctx) |
| 1637 | filtered := make([]*pb.List, numGo) |
| 1638 | filter := func(idx, start, end int) error { |
| 1639 | filtered[idx] = &pb.List{} |
| 1640 | out := filtered[idx] |
| 1641 | for i := start; i < end; i++ { |
| 1642 | uid := uids.Uids[i] |
| 1643 | if i%100 == 0 { |
| 1644 | select { |
| 1645 | case <-egCtx.Done(): |
| 1646 | return egCtx.Err() |
| 1647 | default: |
| 1648 | } |
| 1649 | } |
| 1650 | pl, err := qs.cache.Get(x.DataKey(attr, uid)) |
| 1651 | if err != nil { |
| 1652 | return err |
| 1653 | } |
| 1654 | var tv pb.TaskValue |
| 1655 | err = pl.Iterate(arg.q.ReadTs, 0, func(p *pb.Posting) error { |
| 1656 | tv.ValType = p.ValType |
| 1657 | tv.Val = p.Value |
| 1658 | if types.MatchGeo(&tv, arg.srcFn.geoQuery) { |
| 1659 | out.Uids = append(out.Uids, uid) |
| 1660 | return posting.ErrStopIteration |
| 1661 | } |
| 1662 | return nil |
| 1663 | }) |
| 1664 | if err != nil { |
| 1665 | return err |
| 1666 | } |
| 1667 | } |
| 1668 | return nil |
| 1669 | } |
| 1670 | |
| 1671 | for i := range numGo { |
| 1672 | start := i * width |
| 1673 | end := start + width |
| 1674 | if end > len(uids.Uids) { |
| 1675 | end = len(uids.Uids) |
| 1676 | } |
| 1677 | idx := i |
| 1678 | eg.Go(func() error { |
| 1679 | return filter(idx, start, end) |
| 1680 | }) |
no test coverage detected