ProcessTaskOverNetwork is used to process the query and get the result from the instance which stores posting list corresponding to the predicate in the query.
(ctx context.Context, q *pb.Query)
| 121 | // the instance which stores posting list corresponding to the predicate in the |
| 122 | // query. |
| 123 | func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error) { |
| 124 | attr := q.Attr |
| 125 | gid, err := groups().BelongsToReadOnly(attr, q.ReadTs) |
| 126 | switch { |
| 127 | case err != nil: |
| 128 | return nil, err |
| 129 | case gid == 0: |
| 130 | return nil, errNonExistentTablet |
| 131 | } |
| 132 | |
| 133 | span := trace.SpanFromContext(ctx) |
| 134 | span.AddEvent("ProcessTaskOverNetwork", trace.WithAttributes( |
| 135 | attribute.String("attr", attr), |
| 136 | attribute.String("gid", fmt.Sprintf("%d", gid)), |
| 137 | attribute.String("readTs", fmt.Sprintf("%d", q.ReadTs)), |
| 138 | attribute.String("node_id", fmt.Sprintf("%d", groups().Node.Id)))) |
| 139 | |
| 140 | if groups().ServesGroup(gid) { |
| 141 | // No need for a network call, as this should be run from within this instance. |
| 142 | return processTask(ctx, q, gid) |
| 143 | } |
| 144 | |
| 145 | // Add span for cross-alpha network call |
| 146 | ctx, networkSpan := otel.Tracer("").Start(ctx, "ProcessTaskOverNetwork.RemoteCall") |
| 147 | networkSpan.SetAttributes( |
| 148 | attribute.String("target_group", fmt.Sprintf("%d", gid)), |
| 149 | attribute.String("predicate", x.SafeUTF8(attr)), |
| 150 | attribute.Bool("is_remote", true), |
| 151 | ) |
| 152 | |
| 153 | result, err := processWithBackupRequest(ctx, gid, |
| 154 | func(ctx context.Context, c pb.WorkerClient) (interface{}, error) { |
| 155 | return c.ServeTask(ctx, q) |
| 156 | }) |
| 157 | networkSpan.End() |
| 158 | if err != nil { |
| 159 | return nil, err |
| 160 | } |
| 161 | |
| 162 | reply := result.(*pb.Result) |
| 163 | span.AddEvent("Reply from server", trace.WithAttributes( |
| 164 | attribute.Int("len", len(reply.UidMatrix)), |
| 165 | attribute.Int64("gid", int64(gid)), |
| 166 | attribute.String("attr", attr))) |
| 167 | return reply, nil |
| 168 | } |
| 169 | |
| 170 | // convertValue converts the data to the schema.State() type of predicate. |
| 171 | func convertValue(attr, data string) (types.Val, error) { |
no test coverage detected