()
| 239 | } |
| 240 | |
| 241 | func (i *FileInput) init() (err error) { |
| 242 | defer i.mu.Unlock() |
| 243 | i.mu.Lock() |
| 244 | |
| 245 | var matches []string |
| 246 | |
| 247 | if strings.HasPrefix(i.path, "s3://") { |
| 248 | sess := session.Must(session.NewSession(awsConfig())) |
| 249 | svc := s3.New(sess) |
| 250 | |
| 251 | bucket, key := parseS3Url(i.path) |
| 252 | |
| 253 | params := &s3.ListObjectsInput{ |
| 254 | Bucket: aws.String(bucket), |
| 255 | Prefix: aws.String(key), |
| 256 | } |
| 257 | |
| 258 | resp, err := svc.ListObjects(params) |
| 259 | if err != nil { |
| 260 | Debug(0, "[INPUT-FILE] Error while retreiving list of files from S3", i.path, err) |
| 261 | return err |
| 262 | } |
| 263 | |
| 264 | for _, c := range resp.Contents { |
| 265 | matches = append(matches, "s3://"+bucket+"/"+(*c.Key)) |
| 266 | } |
| 267 | } else if matches, err = filepath.Glob(i.path); err != nil { |
| 268 | Debug(0, "[INPUT-FILE] Wrong file pattern", i.path, err) |
| 269 | return |
| 270 | } |
| 271 | |
| 272 | if len(matches) == 0 { |
| 273 | Debug(0, "[INPUT-FILE] No files match pattern: ", i.path) |
| 274 | return errors.New("No matching files") |
| 275 | } |
| 276 | |
| 277 | i.readers = make([]*fileInputReader, len(matches)) |
| 278 | |
| 279 | for idx, p := range matches { |
| 280 | i.readers[idx] = newFileInputReader(p, i.readDepth, i.dryRun) |
| 281 | } |
| 282 | |
| 283 | i.stats.Add("reader_count", int64(len(matches))) |
| 284 | |
| 285 | return nil |
| 286 | } |
| 287 | |
| 288 | // PluginRead reads message from this plugin |
| 289 | func (i *FileInput) PluginRead() (*Message, error) { |
no test coverage detected