(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, c net.Conn)
| 105 | } |
| 106 | |
| 107 | func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, c net.Conn) { |
| 108 | defer wg.Done() |
| 109 | |
| 110 | lr := NewLineReader(ss.sourcename, ss.lines, c, defaultReadBufferSize, ss.cancel) |
| 111 | var total int |
| 112 | defer func() { |
| 113 | glog.V(2).Infof("stream(%s): read total %d bytes from %s", ss.sourcename, c, total) |
| 114 | glog.V(2).Infof("stream(%s): closing connection, %v", ss.sourcename, c) |
| 115 | err := c.Close() |
| 116 | if err != nil { |
| 117 | logErrors.Add(ss.address, 1) |
| 118 | glog.Info(err) |
| 119 | } |
| 120 | lr.Finish(ctx) |
| 121 | logCloses.Add(ss.address, 1) |
| 122 | }() |
| 123 | ctx, cancel := context.WithCancel(ctx) |
| 124 | defer cancel() |
| 125 | SetReadDeadlineOnDone(ctx, c) |
| 126 | |
| 127 | for { |
| 128 | n, err := lr.ReadAndSend(ctx) |
| 129 | glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ss.sourcename, n, err) |
| 130 | |
| 131 | if n > 0 { |
| 132 | total += n |
| 133 | |
| 134 | // No error implies more to read, so restart the loop. |
| 135 | if err == nil && ctx.Err() == nil { |
| 136 | continue |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | if IsExitableError(err) { |
| 141 | glog.V(2).Infof("stream(%s): exiting, conn has error %s", ss.sourcename, err) |
| 142 | return |
| 143 | } |
| 144 | |
| 145 | // Yield and wait |
| 146 | glog.V(2).Infof("stream(%s): waiting", ss.sourcename) |
| 147 | select { |
| 148 | case <-ctx.Done(): |
| 149 | // Exit after next read attempt. |
| 150 | glog.V(2).Infof("stream(%s:%s): context cancelled, exiting after next read timeout", ss.scheme, ss.address) |
| 151 | case <-waker.Wake(): |
| 152 | // sleep until next Wake() |
| 153 | glog.V(2).Infof("stream(%s): Wake received", ss.sourcename) |
| 154 | } |
| 155 | } |
| 156 | } |
no test coverage detected