MCPcopy
hub / github.com/google/mtail / handleConn

Method handleConn

internal/tailer/logstream/socketstream.go:107–156  ·  view source on GitHub ↗
(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, c net.Conn)

Source from the content-addressed store, hash-verified

105}
106
107func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, c net.Conn) {
108 defer wg.Done()
109
110 lr := NewLineReader(ss.sourcename, ss.lines, c, defaultReadBufferSize, ss.cancel)
111 var total int
112 defer func() {
113 glog.V(2).Infof("stream(%s): read total %d bytes from %s", ss.sourcename, c, total)
114 glog.V(2).Infof("stream(%s): closing connection, %v", ss.sourcename, c)
115 err := c.Close()
116 if err != nil {
117 logErrors.Add(ss.address, 1)
118 glog.Info(err)
119 }
120 lr.Finish(ctx)
121 logCloses.Add(ss.address, 1)
122 }()
123 ctx, cancel := context.WithCancel(ctx)
124 defer cancel()
125 SetReadDeadlineOnDone(ctx, c)
126
127 for {
128 n, err := lr.ReadAndSend(ctx)
129 glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ss.sourcename, n, err)
130
131 if n > 0 {
132 total += n
133
134 // No error implies more to read, so restart the loop.
135 if err == nil && ctx.Err() == nil {
136 continue
137 }
138 }
139
140 if IsExitableError(err) {
141 glog.V(2).Infof("stream(%s): exiting, conn has error %s", ss.sourcename, err)
142 return
143 }
144
145 // Yield and wait
146 glog.V(2).Infof("stream(%s): waiting", ss.sourcename)
147 select {
148 case <-ctx.Done():
149 // Exit after next read attempt.
150 glog.V(2).Infof("stream(%s:%s): context cancelled, exiting after next read timeout", ss.scheme, ss.address)
151 case <-waker.Wake():
152 // sleep until next Wake()
153 glog.V(2).Infof("stream(%s): Wake received", ss.sourcename)
154 }
155 }
156}

Callers 1

streamMethod · 0.95

Calls 7

FinishMethod · 0.95
ReadAndSendMethod · 0.95
NewLineReaderFunction · 0.85
SetReadDeadlineOnDoneFunction · 0.85
IsExitableErrorFunction · 0.85
WakeMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected