MCPcopy Index your code
hub / github.com/cloudfoundry/cli / GetStreamingLogs

Function GetStreamingLogs

actor/sharedaction/logging.go:124–172  ·  view source on GitHub ↗
(appGUID string, client LogCacheClient)

Source from the content-addressed store, hash-verified

122}
123
124func GetStreamingLogs(appGUID string, client LogCacheClient) (<-chan LogMessage, <-chan error, context.CancelFunc) {
125
126 logrus.Info("Start Tailing Logs")
127
128 outgoingLogStream := make(chan LogMessage, 1000)
129 outgoingErrStream := make(chan error, 1000)
130 ctx, cancelFunc := context.WithCancel(context.Background())
131 go func() {
132 defer close(outgoingLogStream)
133 defer close(outgoingErrStream)
134
135 ts := latestEnvelopeTimestamp(client, outgoingErrStream, ctx, appGUID)
136
137 // if the context was cancelled we may not have seen an envelope
138 if ts.IsZero() {
139 return
140 }
141
142 const offset = 1 * time.Second
143 walkStartTime := ts.Add(-offset)
144
145 logcache.Walk(
146 ctx,
147 appGUID,
148 logcache.Visitor(func(envelopes []*loggregator_v2.Envelope) bool {
149 logMessages := convertEnvelopesToLogMessages(envelopes)
150 for _, logMessage := range logMessages {
151 select {
152 case <-ctx.Done():
153 return false
154 default:
155 outgoingLogStream <- *logMessage
156 }
157 }
158 return true
159 }),
160 client.Read,
161 logcache.WithWalkDelay(2*time.Second),
162 logcache.WithWalkStartTime(walkStartTime),
163 logcache.WithWalkEnvelopeTypes(logcache_v1.EnvelopeType_LOG),
164 logcache.WithWalkBackoff(newCliRetryBackoff(retryInterval, retryCount)),
165 logcache.WithWalkLogger(log.New(channelWriter{
166 errChannel: outgoingErrStream,
167 }, "", 0)),
168 )
169 }()
170
171 return outgoingLogStream, outgoingErrStream, cancelFunc
172}
173
174func latestEnvelopeTimestamp(client LogCacheClient, errs chan error, ctx context.Context, sourceID string) time.Time {
175

Calls 4

latestEnvelopeTimestampFunction · 0.85
newCliRetryBackoffFunction · 0.85
AddMethod · 0.65

Tested by

no test coverage detected