TextFile returns a new Dataset which reads the text file fname line by line, and distributes them evenly among multiple shards.
(fname string, shard int)
| 52 | // TextFile returns a new Dataset which reads the text file fname line by line, |
| 53 | // and distributes them evenly among multiple shards. |
| 54 | func (fc *FlowContext) TextFile(fname string, shard int) (ret *Dataset) { |
| 55 | fn := func(out chan string) { |
| 56 | file, err := os.Open(fname) |
| 57 | if err != nil { |
| 58 | // FIXME collect errors |
| 59 | log.Panicf("Can not open file %s: %v", fname, err) |
| 60 | return |
| 61 | } |
| 62 | defer file.Close() |
| 63 | |
| 64 | scanner := bufio.NewScanner(file) |
| 65 | for scanner.Scan() { |
| 66 | out <- scanner.Text() |
| 67 | } |
| 68 | |
| 69 | if err := scanner.Err(); err != nil { |
| 70 | log.Printf("Scan file %s: %v", fname, err) |
| 71 | } |
| 72 | } |
| 73 | return fc.Source(fn, shard) |
| 74 | } |
| 75 | |
| 76 | // Channel returns a new Dataset which has the input channel as the input and sends the received |
| 77 | // values to tasks. |
no test coverage detected