MCPcopy Index your code
hub / github.com/dgraph-io/dgraph / processTask

Function processTask

worker/task.go:1012–1067  ·  view source on GitHub ↗

processTask processes the query, accumulates and returns the result.

(ctx context.Context, q *pb.Query, gid uint32)

Source from the content-addressed store, hash-verified

1010
1011// processTask processes the query, accumulates and returns the result.
1012func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, error) {
1013 safeAttr := x.SafeUTF8(q.Attr)
1014 ctx, span := otel.Tracer("").Start(ctx, "processTask."+safeAttr)
1015 defer span.End()
1016
1017 stop := x.SpanTimer(span, "processTask"+safeAttr)
1018 defer stop()
1019
1020 span.SetAttributes(
1021 attribute.String("startTs", fmt.Sprintf("%d", q.ReadTs)),
1022 attribute.String("node", fmt.Sprintf("%d", groups().Node.Id)),
1023 attribute.String("gid", fmt.Sprintf("%d", gid)))
1024 if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil {
1025 return nil, err
1026 }
1027 maxAssigned := posting.Oracle().MaxAssigned()
1028 span.AddEvent("Done waiting for maxAssigned", trace.WithAttributes(
1029 attribute.String("attr", q.Attr),
1030 attribute.String("readTs", fmt.Sprintf("%d", q.ReadTs)),
1031 attribute.String("max", fmt.Sprintf("%d", maxAssigned))))
1032 if err := groups().ChecksumsMatch(ctx); err != nil {
1033 return nil, err
1034 }
1035 span.AddEvent("Done waiting for checksum match")
1036
1037 // If a group stops serving tablet and it gets partitioned away from group
1038 // zero, then it wouldn't know that this group is no longer serving this
1039 // predicate. There's no issue if a we are serving a particular tablet and
1040 // we get partitioned away from group zero as long as it's not removed.
1041 // BelongsToReadOnly is called instead of BelongsTo to prevent this alpha
1042 // from requesting to serve this tablet.
1043 knownGid, err := groups().BelongsToReadOnly(q.Attr, q.ReadTs)
1044 switch {
1045 case err != nil:
1046 return nil, err
1047 case knownGid == 0:
1048 return nil, errNonExistentTablet
1049 case knownGid != groups().groupId():
1050 return nil, errUnservedTablet
1051 }
1052
1053 var qs queryState
1054 if q.Cache == UseTxnCache {
1055 qs.cache = posting.Oracle().CacheAt(q.ReadTs)
1056 }
1057 if qs.cache == nil {
1058 qs.cache = posting.NoCache(q.ReadTs)
1059 }
1060 // For now, remove the query level cache. It is causing contention for queries with high
1061 // fan-out.
1062 out, err := qs.helpProcessTask(ctx, q, gid)
1063 if err != nil {
1064 return nil, err
1065 }
1066 return out, nil
1067}
1068
1069type queryState struct {

Callers 2

ProcessTaskOverNetworkFunction · 0.85
ServeTaskMethod · 0.85

Calls 14

helpProcessTaskMethod · 0.95
SafeUTF8Function · 0.92
SpanTimerFunction · 0.92
OracleFunction · 0.92
NoCacheFunction · 0.92
groupsFunction · 0.85
WaitForTsMethod · 0.80
MaxAssignedMethod · 0.80
ChecksumsMatchMethod · 0.80
BelongsToReadOnlyMethod · 0.80
groupIdMethod · 0.80
CacheAtMethod · 0.80

Tested by

no test coverage detected