MCPcopy
hub / github.com/dgraph-io/badger / produceKVs

Method produceKVs

stream.go:155–252  ·  view source on GitHub ↗

produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan.

(ctx context.Context, threadId int)

Source from the content-addressed store, hash-verified

153
154// produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan.
155func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
156 var size int
157 var txn *Txn
158 if st.readTs > 0 {
159 txn = st.db.NewTransactionAt(st.readTs, false)
160 } else {
161 txn = st.db.NewTransaction(false)
162 }
163 defer txn.Discard()
164
165 iterate := func(kr keyRange) error {
166 iterOpts := DefaultIteratorOptions
167 iterOpts.AllVersions = true
168 iterOpts.Prefix = st.Prefix
169 iterOpts.PrefetchValues = false
170 itr := txn.NewIterator(iterOpts)
171 itr.ThreadId = threadId
172 defer itr.Close()
173
174 // This unique stream id is used to identify all the keys from this iteration.
175 streamId := atomic.AddUint32(&st.nextStreamId, 1)
176
177 outList := new(pb.KVList)
178
179 sendIt := func() error {
180 select {
181 case st.kvChan <- outList:
182 case <-ctx.Done():
183 return ctx.Err()
184 }
185 outList = new(pb.KVList)
186 size = 0
187 return nil
188 }
189 var prevKey []byte
190 for itr.Seek(kr.left); itr.Valid(); {
191 // it.Valid would only return true for keys with the provided Prefix in iterOpts.
192 item := itr.Item()
193 if bytes.Equal(item.Key(), prevKey) {
194 itr.Next()
195 continue
196 }
197 prevKey = append(prevKey[:0], item.Key()...)
198
199 // Check if we reached the end of the key range.
200 if len(kr.right) > 0 && bytes.Compare(item.Key(), kr.right) >= 0 {
201 break
202 }
203 // Check if we should pick this key.
204 if st.ChooseKey != nil && !st.ChooseKey(item) {
205 continue
206 }
207
208 // Now convert to key value.
209 list, err := st.KeyToList(item.KeyCopy(nil), itr)
210 if err != nil {
211 return err
212 }

Callers 1

OrchestrateMethod · 0.95

Calls 13

DiscardMethod · 0.95
NewIteratorMethod · 0.95
CloseMethod · 0.95
SeekMethod · 0.95
ValidMethod · 0.95
ItemMethod · 0.95
NextMethod · 0.95
NewTransactionAtMethod · 0.80
NewTransactionMethod · 0.80
KeyCopyMethod · 0.80
KeyMethod · 0.65
DoneMethod · 0.45

Tested by

no test coverage detected