(t *testing.T)
| 159 | } |
| 160 | |
| 161 | func TestStreamWithThreadId(t *testing.T) { |
| 162 | dir, err := ioutil.TempDir("", "badger-test") |
| 163 | require.NoError(t, err) |
| 164 | defer removeDir(dir) |
| 165 | |
| 166 | db, err := OpenManaged(DefaultOptions(dir)) |
| 167 | require.NoError(t, err) |
| 168 | |
| 169 | var count int |
| 170 | for _, prefix := range []string{"p0", "p1", "p2"} { |
| 171 | txn := db.NewTransactionAt(math.MaxUint64, true) |
| 172 | for i := 1; i <= 100; i++ { |
| 173 | require.NoError(t, txn.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i)))) |
| 174 | count++ |
| 175 | } |
| 176 | require.NoError(t, txn.CommitAt(5, nil)) |
| 177 | } |
| 178 | |
| 179 | stream := db.NewStreamAt(math.MaxUint64) |
| 180 | stream.LogPrefix = "Testing" |
| 181 | stream.KeyToList = func(key []byte, itr *Iterator) ( |
| 182 | *bpb.KVList, error) { |
| 183 | require.Less(t, itr.ThreadId, stream.NumGo) |
| 184 | return stream.ToList(key, itr) |
| 185 | } |
| 186 | c := &collector{} |
| 187 | stream.Send = c.Send |
| 188 | |
| 189 | err = stream.Orchestrate(ctxb) |
| 190 | require.NoError(t, err) |
| 191 | require.Equal(t, 300, len(c.kv), "Expected 300. Got: %d", len(c.kv)) |
| 192 | |
| 193 | m := make(map[string]int) |
| 194 | for _, kv := range c.kv { |
| 195 | prefix, ki := keyToInt(kv.Key) |
| 196 | expected := value(ki) |
| 197 | require.Equal(t, expected, kv.Value) |
| 198 | m[prefix]++ |
| 199 | } |
| 200 | require.Equal(t, 3, len(m)) |
| 201 | for pred, count := range m { |
| 202 | require.Equal(t, 100, count, "Count mismatch for pred: %s", pred) |
| 203 | } |
| 204 | require.NoError(t, db.Close()) |
| 205 | } |
| 206 | |
| 207 | func TestBigStream(t *testing.T) { |
| 208 | // Set the maxStreamSize to 1MB for the duration of the test so that the it can use a smaller |
nothing calls this directly
no test coverage detected
searching dependent graphs…