| 162 | } |
| 163 | |
| 164 | func (c *Configuration) NewReader(dialer *kafka.Dialer, logger *log.Entry) (*kafka.Reader, error) { |
| 165 | rConf := kafka.ReaderConfig{ |
| 166 | Brokers: c.Brokers, |
| 167 | Topic: c.Topic, |
| 168 | Dialer: dialer, |
| 169 | Logger: kafka.LoggerFunc(logger.Debugf), |
| 170 | ErrorLogger: kafka.LoggerFunc(logger.Errorf), |
| 171 | } |
| 172 | |
| 173 | if c.GroupID != "" && c.Partition != 0 { |
| 174 | return &kafka.Reader{}, errors.New("cannot specify both group_id and partition") |
| 175 | } |
| 176 | |
| 177 | if c.GroupID != "" { |
| 178 | rConf.GroupID = c.GroupID |
| 179 | // kafka-go does not support calling SetOffset while using a consumer group |
| 180 | rConf.StartOffset = kafka.LastOffset |
| 181 | } else if c.Partition != 0 { |
| 182 | rConf.Partition = c.Partition |
| 183 | } else { |
| 184 | logger.Warnf("no group_id specified, crowdsec will only read from the 1st partition of the topic") |
| 185 | } |
| 186 | |
| 187 | if c.BatchConfiguration.BatchMinBytes != 0 { |
| 188 | rConf.MinBytes = c.BatchConfiguration.BatchMinBytes |
| 189 | } |
| 190 | |
| 191 | if c.BatchConfiguration.BatchMaxBytes != 0 { |
| 192 | rConf.MaxBytes = c.BatchConfiguration.BatchMaxBytes |
| 193 | } |
| 194 | |
| 195 | if c.BatchConfiguration.BatchMaxWait != 0 { |
| 196 | rConf.MaxWait = c.BatchConfiguration.BatchMaxWait |
| 197 | } |
| 198 | |
| 199 | if c.BatchConfiguration.BatchQueueSize != 0 { |
| 200 | rConf.QueueCapacity = c.BatchConfiguration.BatchQueueSize |
| 201 | } |
| 202 | |
| 203 | if c.BatchConfiguration.CommitInterval != 0 { |
| 204 | rConf.CommitInterval = c.BatchConfiguration.CommitInterval |
| 205 | } |
| 206 | |
| 207 | if err := rConf.Validate(); err != nil { |
| 208 | return &kafka.Reader{}, fmt.Errorf("while validating reader configuration: %w", err) |
| 209 | } |
| 210 | |
| 211 | return kafka.NewReader(rConf), nil |
| 212 | } |