MCPcopy
hub / github.com/google/mtail / newSocketStream

Function newSocketStream

internal/tailer/logstream/socketstream.go:27–47  ·  view source on GitHub ↗
(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode)

Source from the content-addressed store, hash-verified

25}
26
27func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) {
28 if address == "" {
29 return nil, ErrEmptySocketAddress
30 }
31 ctx, cancel := context.WithCancel(ctx)
32 ss := &socketStream{
33 cancel: cancel,
34 oneShot: oneShot,
35 scheme: scheme,
36 address: address,
37 streamBase: streamBase{
38 sourcename: fmt.Sprintf("%s://%s", scheme, address),
39 lines: make(chan *logline.LogLine),
40 },
41 }
42
43 if err := ss.stream(ctx, wg, waker); err != nil {
44 return nil, err
45 }
46 return ss, nil
47}
48
49// stream starts goroutines to read data from the stream socket, until Stop is called or the context is cancelled.
50func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker) error {

Callers 1

NewFunction · 0.85

Calls 1

streamMethod · 0.95

Tested by

no test coverage detected