| 222 | } |
| 223 | |
| 224 | func (mi *mapIterator) Next(cbuf *z.Buffer, partitionKey []byte) { |
| 225 | readMapEntry := func() error { |
| 226 | if len(mi.meBuf) > 0 { |
| 227 | return nil |
| 228 | } |
| 229 | r := mi.reader |
| 230 | sizeBuf, err := r.Peek(binary.MaxVarintLen64) |
| 231 | if err != nil { |
| 232 | return err |
| 233 | } |
| 234 | sz, n := binary.Uvarint(sizeBuf) |
| 235 | if n <= 0 { |
| 236 | log.Fatalf("Could not read uvarint: %d", n) |
| 237 | } |
| 238 | x.Check2(r.Discard(n)) |
| 239 | if cap(mi.meBuf) < int(sz) { |
| 240 | mi.meBuf = make([]byte, int(sz)) |
| 241 | } |
| 242 | mi.meBuf = mi.meBuf[:int(sz)] |
| 243 | x.Check2(io.ReadFull(r, mi.meBuf)) |
| 244 | return nil |
| 245 | } |
| 246 | for { |
| 247 | if err := readMapEntry(); err == io.EOF { |
| 248 | break |
| 249 | } else { |
| 250 | x.Check(err) |
| 251 | } |
| 252 | key := MapEntry(mi.meBuf).Key() |
| 253 | |
| 254 | if len(partitionKey) == 0 || bytes.Compare(key, partitionKey) < 0 { |
| 255 | b := cbuf.SliceAllocate(len(mi.meBuf)) |
| 256 | copy(b, mi.meBuf) |
| 257 | mi.meBuf = mi.meBuf[:0] |
| 258 | // map entry is already part of cBuf. |
| 259 | continue |
| 260 | } |
| 261 | // Current key is not part of this batch so track that we have already read the key. |
| 262 | return |
| 263 | } |
| 264 | } |
| 265 | |
| 266 | func (mi *mapIterator) Close() error { |
| 267 | return mi.fd.Close() |