| 115 | } |
| 116 | |
| 117 | func (b *Batcher[T]) Start() error { |
| 118 | if b.Sharding == nil { |
| 119 | return errs.New("Sharding function is required").Wrap() |
| 120 | } |
| 121 | if b.Do == nil { |
| 122 | return errs.New("Do function is required").Wrap() |
| 123 | } |
| 124 | if b.Key == nil { |
| 125 | return errs.New("Key function is required").Wrap() |
| 126 | } |
| 127 | b.wait.Add(b.config.worker) |
| 128 | for i := 0; i < b.config.worker; i++ { |
| 129 | go b.run(i, b.chArrays[i]) |
| 130 | } |
| 131 | b.wait.Add(1) |
| 132 | go b.scheduler() |
| 133 | return nil |
| 134 | } |
| 135 | |
| 136 | func (b *Batcher[T]) Put(ctx context.Context, data *T) error { |
| 137 | if data == nil { |