| 110 | } |
| 111 | |
| 112 | func process(dg *dgo.Dgraph, conf *viper.Viper) (Counter, error) { |
| 113 | ro := conf.GetBool("ro") |
| 114 | be := conf.GetBool("be") |
| 115 | pred := conf.GetString("pred") |
| 116 | var txn *dgo.Txn |
| 117 | |
| 118 | switch { |
| 119 | case be: |
| 120 | txn = dg.NewReadOnlyTxn().BestEffort() |
| 121 | case ro: |
| 122 | txn = dg.NewReadOnlyTxn() |
| 123 | default: |
| 124 | txn = dg.NewTxn() |
| 125 | } |
| 126 | defer func() { |
| 127 | if err := txn.Discard(context.Background()); err != nil { |
| 128 | fmt.Printf("Discarding transaction failed: %+v\n", err) |
| 129 | } |
| 130 | }() |
| 131 | |
| 132 | ctx, span := trace.StartSpan(context.Background(), "Counter") |
| 133 | defer span.End() |
| 134 | |
| 135 | counter, err := queryCounter(ctx, txn, pred) |
| 136 | if err != nil { |
| 137 | return Counter{}, err |
| 138 | } |
| 139 | if be || ro { |
| 140 | return counter, nil |
| 141 | } |
| 142 | |
| 143 | counter.Val++ |
| 144 | var mu api.Mutation |
| 145 | mu.CommitNow = true |
| 146 | if len(counter.Uid) == 0 { |
| 147 | counter.Uid = "_:new" |
| 148 | } |
| 149 | mu.SetNquads = []byte(fmt.Sprintf(`<%s> <%s> "%d"^^<xs:int> .`, counter.Uid, pred, counter.Val)) |
| 150 | |
| 151 | // Don't put any timeout for mutation. |
| 152 | resp, err := txn.Mutate(ctx, &mu) |
| 153 | if err != nil { |
| 154 | return Counter{}, err |
| 155 | } |
| 156 | |
| 157 | counter.mLatency = time.Duration(resp.Latency.GetTotalNs()).Round(time.Millisecond) |
| 158 | return counter, nil |
| 159 | } |
| 160 | |
| 161 | func run(conf *viper.Viper) { |
| 162 | trace.ApplyConfig(trace.Config{ |