processTask processes the query, accumulates and returns the result.
(ctx context.Context, q *pb.Query, gid uint32)
| 1010 | |
| 1011 | // processTask processes the query, accumulates and returns the result. |
| 1012 | func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, error) { |
| 1013 | safeAttr := x.SafeUTF8(q.Attr) |
| 1014 | ctx, span := otel.Tracer("").Start(ctx, "processTask."+safeAttr) |
| 1015 | defer span.End() |
| 1016 | |
| 1017 | stop := x.SpanTimer(span, "processTask"+safeAttr) |
| 1018 | defer stop() |
| 1019 | |
| 1020 | span.SetAttributes( |
| 1021 | attribute.String("startTs", fmt.Sprintf("%d", q.ReadTs)), |
| 1022 | attribute.String("node", fmt.Sprintf("%d", groups().Node.Id)), |
| 1023 | attribute.String("gid", fmt.Sprintf("%d", gid))) |
| 1024 | if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil { |
| 1025 | return nil, err |
| 1026 | } |
| 1027 | maxAssigned := posting.Oracle().MaxAssigned() |
| 1028 | span.AddEvent("Done waiting for maxAssigned", trace.WithAttributes( |
| 1029 | attribute.String("attr", q.Attr), |
| 1030 | attribute.String("readTs", fmt.Sprintf("%d", q.ReadTs)), |
| 1031 | attribute.String("max", fmt.Sprintf("%d", maxAssigned)))) |
| 1032 | if err := groups().ChecksumsMatch(ctx); err != nil { |
| 1033 | return nil, err |
| 1034 | } |
| 1035 | span.AddEvent("Done waiting for checksum match") |
| 1036 | |
| 1037 | // If a group stops serving tablet and it gets partitioned away from group |
| 1038 | // zero, then it wouldn't know that this group is no longer serving this |
| 1039 | // predicate. There's no issue if a we are serving a particular tablet and |
| 1040 | // we get partitioned away from group zero as long as it's not removed. |
| 1041 | // BelongsToReadOnly is called instead of BelongsTo to prevent this alpha |
| 1042 | // from requesting to serve this tablet. |
| 1043 | knownGid, err := groups().BelongsToReadOnly(q.Attr, q.ReadTs) |
| 1044 | switch { |
| 1045 | case err != nil: |
| 1046 | return nil, err |
| 1047 | case knownGid == 0: |
| 1048 | return nil, errNonExistentTablet |
| 1049 | case knownGid != groups().groupId(): |
| 1050 | return nil, errUnservedTablet |
| 1051 | } |
| 1052 | |
| 1053 | var qs queryState |
| 1054 | if q.Cache == UseTxnCache { |
| 1055 | qs.cache = posting.Oracle().CacheAt(q.ReadTs) |
| 1056 | } |
| 1057 | if qs.cache == nil { |
| 1058 | qs.cache = posting.NoCache(q.ReadTs) |
| 1059 | } |
| 1060 | // For now, remove the query level cache. It is causing contention for queries with high |
| 1061 | // fan-out. |
| 1062 | out, err := qs.helpProcessTask(ctx, q, gid) |
| 1063 | if err != nil { |
| 1064 | return nil, err |
| 1065 | } |
| 1066 | return out, nil |
| 1067 | } |
| 1068 | |
| 1069 | type queryState struct { |
no test coverage detected