MCPcopy Index your code
hub / github.com/dgraph-io/dgraph / processSort

Function processSort

worker/sort.go:494–564  ·  view source on GitHub ↗

processSort does sorting with pagination. It works by iterating over index buckets. As it iterates, it intersects with each UID list of the UID matrix. To optimize for pagination, we maintain the "offsets and sizes" or pagination window for each UID list. For each UID list, we ignore the bucket if w

(ctx context.Context, ts *pb.SortMessage)

Source from the content-addressed store, hash-verified

492// enough for our pagination params. When all the UID lists are done, we stop
493// iterating over the index.
494func processSort(ctx context.Context, ts *pb.SortMessage) (*pb.SortResult, error) {
495 span := trace.SpanFromContext(ctx)
496 stop := x.SpanTimer(span, "processSort")
497 defer stop()
498
499 span.SetAttributes(
500 attribute.Int("startTs", int(ts.ReadTs)),
501 )
502
503 if err := posting.Oracle().WaitForTs(ctx, ts.ReadTs); err != nil {
504 return nil, err
505 }
506 span.SetAttributes(
507 attribute.String("checksumMatch", "true"),
508 )
509
510 if ts.Count < 0 {
511 return nil, errors.Errorf(
512 "We do not yet support negative or infinite count with sorting: %s %d. "+
513 "Try flipping order and return first few elements instead.",
514 x.ParseAttr(ts.Order[0].Attr), ts.Count)
515 }
516 // TODO (pawan) - Why check only the first attribute, what if other attributes are of list type?
517 if schema.State().IsList(ts.Order[0].Attr) {
518 return nil, errors.Errorf("Sorting not supported on attr: %s of type: [scalar]",
519 x.ParseAttr(ts.Order[0].Attr))
520 }
521
522 // We're not using any txn local cache here. So, no need to deal with that yet.
523 cctx, cancel := context.WithCancel(ctx)
524 defer cancel()
525
526 resCh := make(chan *sortresult, 2)
527 go func() {
528 select {
529 case <-time.After(3 * time.Millisecond):
530 // Wait between ctx chan and time chan.
531 case <-ctx.Done():
532 resCh <- &sortresult{err: ctx.Err()}
533 return
534 }
535 r := sortWithoutIndex(cctx, ts)
536 resCh <- r
537 }()
538
539 go func() {
540 sr := sortWithIndex(cctx, ts)
541 resCh <- sr
542 }()
543
544 r := <-resCh
545 if r.err == nil {
546 cancel()
547 // wait for other goroutine to get cancelled
548 <-resCh
549 } else {
550 span.AddEvent(fmt.Sprintf("Error processing sort: %+v", r.err))
551 r = <-resCh

Callers 2

SortOverNetworkFunction · 0.85
SortMethod · 0.85

Calls 12

SpanTimerFunction · 0.92
OracleFunction · 0.92
ParseAttrFunction · 0.92
StateFunction · 0.92
sortWithoutIndexFunction · 0.85
sortWithIndexFunction · 0.85
multiSortFunction · 0.85
WaitForTsMethod · 0.80
IsListMethod · 0.80
StringMethod · 0.45
ErrorfMethod · 0.45
DoneMethod · 0.45

Tested by

no test coverage detected