MCPcopy
hub / github.com/openimsdk/open-im-server / TestBatcher

Function TestBatcher

pkg/tools/batcher/batcher_test.go:11–66  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

9)
10
11func TestBatcher(t *testing.T) {
12 config := Config{
13 size: 1000,
14 buffer: 10,
15 worker: 10,
16 interval: 5 * time.Millisecond,
17 }
18
19 b := New[string](
20 WithSize(config.size),
21 WithBuffer(config.buffer),
22 WithWorker(config.worker),
23 WithInterval(config.interval),
24 WithSyncWait(true),
25 )
26
27 // Mock Do function to simply print values for demonstration
28 b.Do = func(ctx context.Context, channelID int, vals *Msg[string]) {
29 t.Logf("Channel %d Processed batch: %v", channelID, vals)
30 }
31 b.OnComplete = func(lastMessage *string, totalCount int) {
32 t.Logf("Completed processing with last message: %v, total count: %d", *lastMessage, totalCount)
33 }
34 b.Sharding = func(key string) int {
35 hashCode := stringutil.GetHashCode(key)
36 return int(hashCode) % config.worker
37 }
38 b.Key = func(data *string) string {
39 return *data
40 }
41
42 err := b.Start()
43 if err != nil {
44 t.Fatal(err)
45 }
46
47 // Test normal data processing
48 for i := 0; i < 10000; i++ {
49 data := "data" + fmt.Sprintf("%d", i)
50 if err := b.Put(context.Background(), &data); err != nil {
51 t.Fatal(err)
52 }
53 }
54
55 time.Sleep(time.Duration(1) * time.Second)
56 start := time.Now()
57 // Wait for all processing to finish
58 b.Close()
59
60 elapsed := time.Since(start)
61 t.Logf("Close took %s", elapsed)
62
63 if len(b.data) != 0 {
64 t.Error("Data channel should be empty after closing")
65 }
66}

Callers

nothing calls this directly

Calls 8

WithSizeFunction · 0.85
WithBufferFunction · 0.85
WithWorkerFunction · 0.85
WithIntervalFunction · 0.85
WithSyncWaitFunction · 0.85
PutMethod · 0.80
CloseMethod · 0.65
StartMethod · 0.45

Tested by

no test coverage detected