Sort is used to sort given UID matrix.
(ctx context.Context, s *pb.SortMessage)
| 87 | |
| 88 | // Sort is used to sort given UID matrix. |
| 89 | func (w *grpcWorker) Sort(ctx context.Context, s *pb.SortMessage) (*pb.SortResult, error) { |
| 90 | if ctx.Err() != nil { |
| 91 | return &emptySortResult, ctx.Err() |
| 92 | } |
| 93 | |
| 94 | // Manually extract trace context from gRPC metadata for cross-alpha tracing |
| 95 | ctx = x.ExtractTraceContext(ctx) |
| 96 | |
| 97 | ctx, span := otel.Tracer("").Start(ctx, "worker.Sort") |
| 98 | defer span.End() |
| 99 | |
| 100 | gid, err := groups().BelongsToReadOnly(s.Order[0].Attr, s.ReadTs) |
| 101 | if err != nil { |
| 102 | return &emptySortResult, err |
| 103 | } |
| 104 | |
| 105 | span.AddEvent("Sorting", trace.WithAttributes( |
| 106 | attribute.String("attribute", s.Order[0].Attr), |
| 107 | attribute.Int("groupId", int(gid)))) |
| 108 | |
| 109 | if gid != groups().groupId() { |
| 110 | return nil, errors.Errorf("attr: %q groupId: %v Request sent to wrong server.", |
| 111 | s.Order[0].Attr, gid) |
| 112 | } |
| 113 | |
| 114 | var reply *pb.SortResult |
| 115 | c := make(chan error, 1) |
| 116 | go func() { |
| 117 | var err error |
| 118 | reply, err = processSort(ctx, s) |
| 119 | c <- err |
| 120 | }() |
| 121 | |
| 122 | select { |
| 123 | case <-ctx.Done(): |
| 124 | return &emptySortResult, ctx.Err() |
| 125 | case err := <-c: |
| 126 | return reply, err |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | var ( |
| 131 | errContinue = errors.Errorf("Continue processing buckets") |
nothing calls this directly
no test coverage detected