(t *testing.T)
| 9 | ) |
| 10 | |
| 11 | func TestBatcher(t *testing.T) { |
| 12 | config := Config{ |
| 13 | size: 1000, |
| 14 | buffer: 10, |
| 15 | worker: 10, |
| 16 | interval: 5 * time.Millisecond, |
| 17 | } |
| 18 | |
| 19 | b := New[string]( |
| 20 | WithSize(config.size), |
| 21 | WithBuffer(config.buffer), |
| 22 | WithWorker(config.worker), |
| 23 | WithInterval(config.interval), |
| 24 | WithSyncWait(true), |
| 25 | ) |
| 26 | |
| 27 | // Mock Do function to simply print values for demonstration |
| 28 | b.Do = func(ctx context.Context, channelID int, vals *Msg[string]) { |
| 29 | t.Logf("Channel %d Processed batch: %v", channelID, vals) |
| 30 | } |
| 31 | b.OnComplete = func(lastMessage *string, totalCount int) { |
| 32 | t.Logf("Completed processing with last message: %v, total count: %d", *lastMessage, totalCount) |
| 33 | } |
| 34 | b.Sharding = func(key string) int { |
| 35 | hashCode := stringutil.GetHashCode(key) |
| 36 | return int(hashCode) % config.worker |
| 37 | } |
| 38 | b.Key = func(data *string) string { |
| 39 | return *data |
| 40 | } |
| 41 | |
| 42 | err := b.Start() |
| 43 | if err != nil { |
| 44 | t.Fatal(err) |
| 45 | } |
| 46 | |
| 47 | // Test normal data processing |
| 48 | for i := 0; i < 10000; i++ { |
| 49 | data := "data" + fmt.Sprintf("%d", i) |
| 50 | if err := b.Put(context.Background(), &data); err != nil { |
| 51 | t.Fatal(err) |
| 52 | } |
| 53 | } |
| 54 | |
| 55 | time.Sleep(time.Duration(1) * time.Second) |
| 56 | start := time.Now() |
| 57 | // Wait for all processing to finish |
| 58 | b.Close() |
| 59 | |
| 60 | elapsed := time.Since(start) |
| 61 | t.Logf("Close took %s", elapsed) |
| 62 | |
| 63 | if len(b.data) != 0 { |
| 64 | t.Error("Data channel should be empty after closing") |
| 65 | } |
| 66 | } |
nothing calls this directly
no test coverage detected