()
| 53 | ) |
| 54 | |
| 55 | func init() { |
| 56 | questions := f.TextFile( |
| 57 | "Posts100k.xml", 4, |
| 58 | ).Filter(func(line string) bool { |
| 59 | return strings.Contains(line, "<row") |
| 60 | }).Map(func(line string, ch chan SourcePost) { |
| 61 | var p SourcePost |
| 62 | err := xml.Unmarshal([]byte(line), &p) |
| 63 | if err != nil { |
| 64 | fmt.Printf("parse source post error %v: %s\n", err, line) |
| 65 | return |
| 66 | } |
| 67 | ch <- p |
| 68 | }).Filter(func(src SourcePost) bool { |
| 69 | return src.PostTypeId == 1 |
| 70 | }).Map(func(src SourcePost) (p Post) { |
| 71 | p.PostTypeId = src.PostTypeId |
| 72 | |
| 73 | t, err := time.Parse("2006-01-02T15:04:05.000", src.CreationDate) |
| 74 | if err != nil { |
| 75 | fmt.Printf("error parse creation date %s: %v\n", src.CreationDate, err) |
| 76 | } else { |
| 77 | p.CreationDate = t |
| 78 | } |
| 79 | |
| 80 | if len(src.Tags) > 0 { |
| 81 | p.Tags = strings.Split(src.Tags[1:len(src.Tags)-1], "><") |
| 82 | } |
| 83 | |
| 84 | return |
| 85 | }) |
| 86 | |
| 87 | questions.Map(func(p Post, out chan flow.KeyValue) { |
| 88 | if len(p.Tags) > 1 { |
| 89 | for _, t := range p.Tags { |
| 90 | out <- flow.KeyValue{t, 1} |
| 91 | } |
| 92 | } |
| 93 | }).ReduceByKey(func(x int, y int) int { |
| 94 | return x + y |
| 95 | }).Map(func(tag string, count int) flow.KeyValue { |
| 96 | return flow.KeyValue{count, tag} |
| 97 | }).Sort(func(a, b int) bool { |
| 98 | return a < b |
| 99 | }).Map(func(count int, tag string) { |
| 100 | fmt.Printf("%d %s\n", count, tag) |
| 101 | }) |
| 102 | |
| 103 | questions.Map(func(p Post) flow.KeyValue { |
| 104 | return flow.KeyValue{p.CreationDate.Format("2006-01"), 1} |
| 105 | }).ReduceByKey(func(x int, y int) int { |
| 106 | return x + y |
| 107 | }).Sort(nil).Map(func(month string, count int) { |
| 108 | fmt.Printf("%s %d\n", month, count) |
| 109 | }) |
| 110 | } |
| 111 | |
| 112 | func main() { |
nothing calls this directly
no test coverage detected