| 79 | } |
| 80 | |
| 81 | func queryCounter(ctx context.Context, txn *dgo.Txn, pred string) (Counter, error) { |
| 82 | span := trace.FromContext(ctx) |
| 83 | |
| 84 | ctx, cancel := context.WithTimeout(ctx, 5*time.Second) |
| 85 | defer cancel() |
| 86 | |
| 87 | var counter Counter |
| 88 | query := fmt.Sprintf("{ q(func: has(%s)) { uid, val: %s }}", pred, pred) |
| 89 | resp, err := txn.Query(ctx, query) |
| 90 | if err != nil { |
| 91 | return counter, errors.Wrapf(err, "while doing query") |
| 92 | } |
| 93 | |
| 94 | m := make(map[string][]Counter) |
| 95 | if err := json.Unmarshal(resp.Json, &m); err != nil { |
| 96 | return counter, err |
| 97 | } |
| 98 | switch len(m["q"]) { |
| 99 | case 0: |
| 100 | // Do nothing. |
| 101 | case 1: |
| 102 | counter = m["q"][0] |
| 103 | default: |
| 104 | x.Panic(errors.Errorf("Invalid response: %q", resp.Json)) |
| 105 | } |
| 106 | span.Annotatef(nil, "Found counter: %+v", counter) |
| 107 | counter.startTs = resp.GetTxn().GetStartTs() |
| 108 | counter.qLatency = time.Duration(resp.Latency.GetTotalNs()).Round(time.Millisecond) |
| 109 | return counter, nil |
| 110 | } |
| 111 | |
| 112 | func process(dg *dgo.Dgraph, conf *viper.Viper) (Counter, error) { |
| 113 | ro := conf.GetBool("ro") |