ServeTask is used to respond to a query.
(ctx context.Context, q *pb.Query)
| 2222 | |
| 2223 | // ServeTask is used to respond to a query. |
| 2224 | func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, error) { |
| 2225 | // Manually extract trace context from gRPC metadata using the propagator |
| 2226 | // This ensures the trace context is properly extracted for cross-alpha tracing |
| 2227 | ctx = x.ExtractTraceContext(ctx) |
| 2228 | |
| 2229 | // Sanitize attr for span name to ensure valid UTF-8 for OTLP export |
| 2230 | safeAttr := x.SafeUTF8(q.Attr) |
| 2231 | ctx, span := otel.Tracer("").Start(ctx, "worker.ServeTask", |
| 2232 | trace.WithAttributes(attribute.String("predicate", safeAttr))) |
| 2233 | defer span.End() |
| 2234 | |
| 2235 | if ctx.Err() != nil { |
| 2236 | return nil, ctx.Err() |
| 2237 | } |
| 2238 | |
| 2239 | // It could be possible that the server isn't ready but a peer sends a |
| 2240 | // request. In that case we should check for the health here. |
| 2241 | if err := x.HealthCheck(); err != nil { |
| 2242 | return nil, err |
| 2243 | } |
| 2244 | |
| 2245 | gid, err := groups().BelongsToReadOnly(q.Attr, q.ReadTs) |
| 2246 | switch { |
| 2247 | case err != nil: |
| 2248 | return nil, err |
| 2249 | case gid == 0: |
| 2250 | return nil, errNonExistentTablet |
| 2251 | case gid != groups().groupId(): |
| 2252 | return nil, errUnservedTablet |
| 2253 | } |
| 2254 | |
| 2255 | var numUids int |
| 2256 | if q.UidList != nil { |
| 2257 | numUids = len(q.UidList.Uids) |
| 2258 | } |
| 2259 | span.AddEvent("ServeTask details", trace.WithAttributes( |
| 2260 | attribute.String("attr", q.Attr), |
| 2261 | attribute.Int("num_uids", numUids), |
| 2262 | attribute.Int64("group_id", int64(gid)))) |
| 2263 | |
| 2264 | if !groups().ServesGroup(gid) { |
| 2265 | return nil, errors.Errorf( |
| 2266 | "Temporary error, attr: %q groupId: %v Request sent to wrong server", |
| 2267 | x.ParseAttr(q.Attr), gid) |
| 2268 | } |
| 2269 | |
| 2270 | type reply struct { |
| 2271 | result *pb.Result |
| 2272 | err error |
| 2273 | } |
| 2274 | c := make(chan reply, 1) |
| 2275 | go func() { |
| 2276 | result, err := processTask(ctx, q, gid) |
| 2277 | c <- reply{result, err} |
| 2278 | }() |
| 2279 | |
| 2280 | select { |
| 2281 | case <-ctx.Done(): |
nothing calls this directly
no test coverage detected