produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan.
(ctx context.Context, threadId int)
| 153 | |
| 154 | // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan. |
| 155 | func (st *Stream) produceKVs(ctx context.Context, threadId int) error { |
| 156 | var size int |
| 157 | var txn *Txn |
| 158 | if st.readTs > 0 { |
| 159 | txn = st.db.NewTransactionAt(st.readTs, false) |
| 160 | } else { |
| 161 | txn = st.db.NewTransaction(false) |
| 162 | } |
| 163 | defer txn.Discard() |
| 164 | |
| 165 | iterate := func(kr keyRange) error { |
| 166 | iterOpts := DefaultIteratorOptions |
| 167 | iterOpts.AllVersions = true |
| 168 | iterOpts.Prefix = st.Prefix |
| 169 | iterOpts.PrefetchValues = false |
| 170 | itr := txn.NewIterator(iterOpts) |
| 171 | itr.ThreadId = threadId |
| 172 | defer itr.Close() |
| 173 | |
| 174 | // This unique stream id is used to identify all the keys from this iteration. |
| 175 | streamId := atomic.AddUint32(&st.nextStreamId, 1) |
| 176 | |
| 177 | outList := new(pb.KVList) |
| 178 | |
| 179 | sendIt := func() error { |
| 180 | select { |
| 181 | case st.kvChan <- outList: |
| 182 | case <-ctx.Done(): |
| 183 | return ctx.Err() |
| 184 | } |
| 185 | outList = new(pb.KVList) |
| 186 | size = 0 |
| 187 | return nil |
| 188 | } |
| 189 | var prevKey []byte |
| 190 | for itr.Seek(kr.left); itr.Valid(); { |
| 191 | // it.Valid would only return true for keys with the provided Prefix in iterOpts. |
| 192 | item := itr.Item() |
| 193 | if bytes.Equal(item.Key(), prevKey) { |
| 194 | itr.Next() |
| 195 | continue |
| 196 | } |
| 197 | prevKey = append(prevKey[:0], item.Key()...) |
| 198 | |
| 199 | // Check if we reached the end of the key range. |
| 200 | if len(kr.right) > 0 && bytes.Compare(item.Key(), kr.right) >= 0 { |
| 201 | break |
| 202 | } |
| 203 | // Check if we should pick this key. |
| 204 | if st.ChooseKey != nil && !st.ChooseKey(item) { |
| 205 | continue |
| 206 | } |
| 207 | |
| 208 | // Now convert to key value. |
| 209 | list, err := st.KeyToList(item.KeyCopy(nil), itr) |
| 210 | if err != nil { |
| 211 | return err |
| 212 | } |
no test coverage detected