stream starts goroutines to read data from the stream socket, until Stop is called or the context is cancelled.
(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker)
| 48 | |
| 49 | // stream starts goroutines to read data from the stream socket, until Stop is called or the context is cancelled. |
| 50 | func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker) error { |
| 51 | l, err := net.Listen(ss.scheme, ss.address) |
| 52 | if err != nil { |
| 53 | logErrors.Add(ss.address, 1) |
| 54 | return err |
| 55 | } |
| 56 | glog.V(2).Infof("stream(%s): opened new socket listener %+v", ss.sourcename, l) |
| 57 | |
| 58 | // signals when a connection has been opened |
| 59 | started := make(chan struct{}) |
| 60 | // tracks connection handling routines |
| 61 | var connWg sync.WaitGroup |
| 62 | |
| 63 | // Set up for shutdown |
| 64 | wg.Add(1) |
| 65 | go func() { |
| 66 | defer wg.Done() |
| 67 | // If oneshot, wait only for the one conn handler to start, otherwise |
| 68 | // wait for context Done or stopChan. |
| 69 | <-started |
| 70 | if !ss.oneShot { |
| 71 | <-ctx.Done() |
| 72 | } |
| 73 | glog.V(2).Infof("stream(%s): closing listener", ss.sourcename) |
| 74 | err := l.Close() |
| 75 | if err != nil { |
| 76 | glog.Info(err) |
| 77 | } |
| 78 | connWg.Wait() |
| 79 | close(ss.lines) |
| 80 | }() |
| 81 | |
| 82 | var connOnce sync.Once |
| 83 | |
| 84 | wg.Add(1) |
| 85 | go func() { |
| 86 | defer wg.Done() |
| 87 | for { |
| 88 | c, err := l.Accept() |
| 89 | if err != nil { |
| 90 | glog.Info(err) |
| 91 | return |
| 92 | } |
| 93 | glog.V(2).Infof("stream(%s): got new conn %v", ss.sourcename, c) |
| 94 | connWg.Add(1) |
| 95 | go ss.handleConn(ctx, &connWg, waker, c) |
| 96 | connOnce.Do(func() { close(started) }) |
| 97 | if ss.oneShot { |
| 98 | glog.Infof("stream(%s): oneshot mode, exiting accept loop", ss.sourcename) |
| 99 | return |
| 100 | } |
| 101 | } |
| 102 | }() |
| 103 | |
| 104 | return nil |
| 105 | } |
| 106 | |
| 107 | func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, c net.Conn) { |
no test coverage detected