| 25 | } |
| 26 | |
| 27 | func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) { |
| 28 | if address == "" { |
| 29 | return nil, ErrEmptySocketAddress |
| 30 | } |
| 31 | ctx, cancel := context.WithCancel(ctx) |
| 32 | ss := &socketStream{ |
| 33 | cancel: cancel, |
| 34 | oneShot: oneShot, |
| 35 | scheme: scheme, |
| 36 | address: address, |
| 37 | streamBase: streamBase{ |
| 38 | sourcename: fmt.Sprintf("%s://%s", scheme, address), |
| 39 | lines: make(chan *logline.LogLine), |
| 40 | }, |
| 41 | } |
| 42 | |
| 43 | if err := ss.stream(ctx, wg, waker); err != nil { |
| 44 | return nil, err |
| 45 | } |
| 46 | return ss, nil |
| 47 | } |
| 48 | |
| 49 | // stream starts goroutines to read data from the stream socket, until Stop is called or the context is cancelled. |
| 50 | func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker) error { |