MCPcopy
hub / github.com/google/mtail / stream

Method stream

internal/tailer/logstream/socketstream.go:50–105  ·  view source on GitHub ↗

stream starts goroutines to read data from the stream socket, until Stop is called or the context is cancelled.

(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker)

Source from the content-addressed store, hash-verified

48
49// stream starts goroutines to read data from the stream socket, until Stop is called or the context is cancelled.
50func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker) error {
51 l, err := net.Listen(ss.scheme, ss.address)
52 if err != nil {
53 logErrors.Add(ss.address, 1)
54 return err
55 }
56 glog.V(2).Infof("stream(%s): opened new socket listener %+v", ss.sourcename, l)
57
58 // signals when a connection has been opened
59 started := make(chan struct{})
60 // tracks connection handling routines
61 var connWg sync.WaitGroup
62
63 // Set up for shutdown
64 wg.Add(1)
65 go func() {
66 defer wg.Done()
67 // If oneshot, wait only for the one conn handler to start, otherwise
68 // wait for context Done or stopChan.
69 <-started
70 if !ss.oneShot {
71 <-ctx.Done()
72 }
73 glog.V(2).Infof("stream(%s): closing listener", ss.sourcename)
74 err := l.Close()
75 if err != nil {
76 glog.Info(err)
77 }
78 connWg.Wait()
79 close(ss.lines)
80 }()
81
82 var connOnce sync.Once
83
84 wg.Add(1)
85 go func() {
86 defer wg.Done()
87 for {
88 c, err := l.Accept()
89 if err != nil {
90 glog.Info(err)
91 return
92 }
93 glog.V(2).Infof("stream(%s): got new conn %v", ss.sourcename, c)
94 connWg.Add(1)
95 go ss.handleConn(ctx, &connWg, waker, c)
96 connOnce.Do(func() { close(started) })
97 if ss.oneShot {
98 glog.Infof("stream(%s): oneshot mode, exiting accept loop", ss.sourcename)
99 return
100 }
101 }
102 }()
103
104 return nil
105}
106
107func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, c net.Conn) {

Callers 1

newSocketStreamFunction · 0.95

Calls 2

handleConnMethod · 0.95
AddMethod · 0.45

Tested by

no test coverage detected