(path string, readDepth int, dryRun bool)
| 165 | } |
| 166 | |
| 167 | func newFileInputReader(path string, readDepth int, dryRun bool) *fileInputReader { |
| 168 | var file io.ReadCloser |
| 169 | var err error |
| 170 | |
| 171 | if strings.HasPrefix(path, "s3://") { |
| 172 | file = NewS3ReadCloser(path) |
| 173 | } else { |
| 174 | file, err = os.Open(path) |
| 175 | } |
| 176 | |
| 177 | if err != nil { |
| 178 | Debug(0, fmt.Sprintf("[INPUT-FILE] err: %q", err)) |
| 179 | return nil |
| 180 | } |
| 181 | |
| 182 | r := &fileInputReader{path: path, file: file, closed: 0, readDepth: readDepth, dryRun: dryRun} |
| 183 | if strings.HasSuffix(path, ".gz") { |
| 184 | gzReader, err := gzip.NewReader(file) |
| 185 | if err != nil { |
| 186 | Debug(0, fmt.Sprintf("[INPUT-FILE] err: %q", err)) |
| 187 | return nil |
| 188 | } |
| 189 | r.reader = bufio.NewReader(gzReader) |
| 190 | } else { |
| 191 | r.reader = bufio.NewReader(file) |
| 192 | } |
| 193 | |
| 194 | heap.Init(&r.queue) |
| 195 | |
| 196 | init := make(chan struct{}) |
| 197 | go r.parse(init) |
| 198 | <-init |
| 199 | |
| 200 | return r |
| 201 | } |
| 202 | |
| 203 | // FileInput can read requests generated by FileOutput |
| 204 | type FileInput struct { |
no test coverage detected