processSort does sorting with pagination. It works by iterating over index buckets. As it iterates, it intersects with each UID list of the UID matrix. To optimize for pagination, we maintain the "offsets and sizes" or pagination window for each UID list. For each UID list, we ignore the bucket if w
(ctx context.Context, ts *pb.SortMessage)
| 492 | // enough for our pagination params. When all the UID lists are done, we stop |
| 493 | // iterating over the index. |
| 494 | func processSort(ctx context.Context, ts *pb.SortMessage) (*pb.SortResult, error) { |
| 495 | span := trace.SpanFromContext(ctx) |
| 496 | stop := x.SpanTimer(span, "processSort") |
| 497 | defer stop() |
| 498 | |
| 499 | span.SetAttributes( |
| 500 | attribute.Int("startTs", int(ts.ReadTs)), |
| 501 | ) |
| 502 | |
| 503 | if err := posting.Oracle().WaitForTs(ctx, ts.ReadTs); err != nil { |
| 504 | return nil, err |
| 505 | } |
| 506 | span.SetAttributes( |
| 507 | attribute.String("checksumMatch", "true"), |
| 508 | ) |
| 509 | |
| 510 | if ts.Count < 0 { |
| 511 | return nil, errors.Errorf( |
| 512 | "We do not yet support negative or infinite count with sorting: %s %d. "+ |
| 513 | "Try flipping order and return first few elements instead.", |
| 514 | x.ParseAttr(ts.Order[0].Attr), ts.Count) |
| 515 | } |
| 516 | // TODO (pawan) - Why check only the first attribute, what if other attributes are of list type? |
| 517 | if schema.State().IsList(ts.Order[0].Attr) { |
| 518 | return nil, errors.Errorf("Sorting not supported on attr: %s of type: [scalar]", |
| 519 | x.ParseAttr(ts.Order[0].Attr)) |
| 520 | } |
| 521 | |
| 522 | // We're not using any txn local cache here. So, no need to deal with that yet. |
| 523 | cctx, cancel := context.WithCancel(ctx) |
| 524 | defer cancel() |
| 525 | |
| 526 | resCh := make(chan *sortresult, 2) |
| 527 | go func() { |
| 528 | select { |
| 529 | case <-time.After(3 * time.Millisecond): |
| 530 | // Wait between ctx chan and time chan. |
| 531 | case <-ctx.Done(): |
| 532 | resCh <- &sortresult{err: ctx.Err()} |
| 533 | return |
| 534 | } |
| 535 | r := sortWithoutIndex(cctx, ts) |
| 536 | resCh <- r |
| 537 | }() |
| 538 | |
| 539 | go func() { |
| 540 | sr := sortWithIndex(cctx, ts) |
| 541 | resCh <- sr |
| 542 | }() |
| 543 | |
| 544 | r := <-resCh |
| 545 | if r.err == nil { |
| 546 | cancel() |
| 547 | // wait for other goroutine to get cancelled |
| 548 | <-resCh |
| 549 | } else { |
| 550 | span.AddEvent(fmt.Sprintf("Error processing sort: %+v", r.err)) |
| 551 | r = <-resCh |
no test coverage detected