()
| 71 | } |
| 72 | |
| 73 | func (p *progress) reportOnce() { |
| 74 | mapEdgeCount := atomic.LoadInt64(&p.mapEdgeCount) |
| 75 | timestamp := time.Now().Format("15:04:05Z0700") |
| 76 | |
| 77 | switch phase(atomic.LoadInt32((*int32)(&p.phase))) { |
| 78 | case nothing: |
| 79 | case mapPhase: |
| 80 | rdfCount := atomic.LoadInt64(&p.nquadCount) |
| 81 | errCount := atomic.LoadInt64(&p.errCount) |
| 82 | elapsed := time.Since(p.start) |
| 83 | fmt.Printf("[%s] MAP %s nquad_count:%s err_count:%s nquad_speed:%s/sec "+ |
| 84 | "edge_count:%s edge_speed:%s/sec jemalloc: %s \n", |
| 85 | timestamp, |
| 86 | x.FixedDuration(elapsed), |
| 87 | niceFloat(float64(rdfCount)), |
| 88 | niceFloat(float64(errCount)), |
| 89 | niceFloat(float64(rdfCount)/elapsed.Seconds()), |
| 90 | niceFloat(float64(mapEdgeCount)), |
| 91 | niceFloat(float64(mapEdgeCount)/elapsed.Seconds()), |
| 92 | humanize.IBytes(uint64(z.NumAllocBytes())), |
| 93 | ) |
| 94 | case reducePhase: |
| 95 | now := time.Now() |
| 96 | elapsed := time.Since(p.startReduce) |
| 97 | if p.startReduce.IsZero() { |
| 98 | p.startReduce = time.Now() |
| 99 | elapsed = time.Second |
| 100 | } |
| 101 | reduceKeyCount := atomic.LoadInt64(&p.reduceKeyCount) |
| 102 | reduceEdgeCount := atomic.LoadInt64(&p.reduceEdgeCount) |
| 103 | pct := "" |
| 104 | if mapEdgeCount != 0 { |
| 105 | pct = fmt.Sprintf("%.2f%% ", 100*float64(reduceEdgeCount)/float64(mapEdgeCount)) |
| 106 | } |
| 107 | fmt.Printf("[%s] REDUCE %s %sedge_count:%s edge_speed:%s/sec "+ |
| 108 | "plist_count:%s plist_speed:%s/sec. Num Encoding MBs: %d. jemalloc: %s \n", |
| 109 | timestamp, |
| 110 | x.FixedDuration(now.Sub(p.start)), |
| 111 | pct, |
| 112 | niceFloat(float64(reduceEdgeCount)), |
| 113 | niceFloat(float64(reduceEdgeCount)/elapsed.Seconds()), |
| 114 | niceFloat(float64(reduceKeyCount)), |
| 115 | niceFloat(float64(reduceKeyCount)/elapsed.Seconds()), |
| 116 | atomic.LoadInt64(&p.numEncoding)/(1<<20), |
| 117 | humanize.IBytes(uint64(z.NumAllocBytes())), |
| 118 | ) |
| 119 | default: |
| 120 | x.AssertTruef(false, "invalid phase") |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | func (p *progress) endSummary() { |
| 125 | p.shutdown <- struct{}{} |
no test coverage detected