MCPcopy
hub / github.com/crowdsecurity/crowdsec / NewReader

Method NewReader

pkg/acquisition/modules/kafka/config.go:164–212  ·  view source on GitHub ↗
(dialer *kafka.Dialer, logger *log.Entry)

Source from the content-addressed store, hash-verified

162}
163
164func (c *Configuration) NewReader(dialer *kafka.Dialer, logger *log.Entry) (*kafka.Reader, error) {
165 rConf := kafka.ReaderConfig{
166 Brokers: c.Brokers,
167 Topic: c.Topic,
168 Dialer: dialer,
169 Logger: kafka.LoggerFunc(logger.Debugf),
170 ErrorLogger: kafka.LoggerFunc(logger.Errorf),
171 }
172
173 if c.GroupID != "" && c.Partition != 0 {
174 return &kafka.Reader{}, errors.New("cannot specify both group_id and partition")
175 }
176
177 if c.GroupID != "" {
178 rConf.GroupID = c.GroupID
179 // kafka-go does not support calling SetOffset while using a consumer group
180 rConf.StartOffset = kafka.LastOffset
181 } else if c.Partition != 0 {
182 rConf.Partition = c.Partition
183 } else {
184 logger.Warnf("no group_id specified, crowdsec will only read from the 1st partition of the topic")
185 }
186
187 if c.BatchConfiguration.BatchMinBytes != 0 {
188 rConf.MinBytes = c.BatchConfiguration.BatchMinBytes
189 }
190
191 if c.BatchConfiguration.BatchMaxBytes != 0 {
192 rConf.MaxBytes = c.BatchConfiguration.BatchMaxBytes
193 }
194
195 if c.BatchConfiguration.BatchMaxWait != 0 {
196 rConf.MaxWait = c.BatchConfiguration.BatchMaxWait
197 }
198
199 if c.BatchConfiguration.BatchQueueSize != 0 {
200 rConf.QueueCapacity = c.BatchConfiguration.BatchQueueSize
201 }
202
203 if c.BatchConfiguration.CommitInterval != 0 {
204 rConf.CommitInterval = c.BatchConfiguration.CommitInterval
205 }
206
207 if err := rConf.Validate(); err != nil {
208 return &kafka.Reader{}, fmt.Errorf("while validating reader configuration: %w", err)
209 }
210
211 return kafka.NewReader(rConf), nil
212}

Callers 15

fireHandlerFunction · 0.80
smokeHandlerFunction · 0.80
searchHandlerFunction · 0.80
smokeHandlerFunction · 0.80
ValidateYAMLFunction · 0.80
ParseSourceConfigFunction · 0.80
sourcesFromFileFunction · 0.80
processRequestMethod · 0.80

Calls 1

ValidateMethod · 0.65