(appGUID string, client LogCacheClient)
| 122 | } |
| 123 | |
| 124 | func GetStreamingLogs(appGUID string, client LogCacheClient) (<-chan LogMessage, <-chan error, context.CancelFunc) { |
| 125 | |
| 126 | logrus.Info("Start Tailing Logs") |
| 127 | |
| 128 | outgoingLogStream := make(chan LogMessage, 1000) |
| 129 | outgoingErrStream := make(chan error, 1000) |
| 130 | ctx, cancelFunc := context.WithCancel(context.Background()) |
| 131 | go func() { |
| 132 | defer close(outgoingLogStream) |
| 133 | defer close(outgoingErrStream) |
| 134 | |
| 135 | ts := latestEnvelopeTimestamp(client, outgoingErrStream, ctx, appGUID) |
| 136 | |
| 137 | // if the context was cancelled we may not have seen an envelope |
| 138 | if ts.IsZero() { |
| 139 | return |
| 140 | } |
| 141 | |
| 142 | const offset = 1 * time.Second |
| 143 | walkStartTime := ts.Add(-offset) |
| 144 | |
| 145 | logcache.Walk( |
| 146 | ctx, |
| 147 | appGUID, |
| 148 | logcache.Visitor(func(envelopes []*loggregator_v2.Envelope) bool { |
| 149 | logMessages := convertEnvelopesToLogMessages(envelopes) |
| 150 | for _, logMessage := range logMessages { |
| 151 | select { |
| 152 | case <-ctx.Done(): |
| 153 | return false |
| 154 | default: |
| 155 | outgoingLogStream <- *logMessage |
| 156 | } |
| 157 | } |
| 158 | return true |
| 159 | }), |
| 160 | client.Read, |
| 161 | logcache.WithWalkDelay(2*time.Second), |
| 162 | logcache.WithWalkStartTime(walkStartTime), |
| 163 | logcache.WithWalkEnvelopeTypes(logcache_v1.EnvelopeType_LOG), |
| 164 | logcache.WithWalkBackoff(newCliRetryBackoff(retryInterval, retryCount)), |
| 165 | logcache.WithWalkLogger(log.New(channelWriter{ |
| 166 | errChannel: outgoingErrStream, |
| 167 | }, "", 0)), |
| 168 | ) |
| 169 | }() |
| 170 | |
| 171 | return outgoingLogStream, outgoingErrStream, cancelFunc |
| 172 | } |
| 173 | |
| 174 | func latestEnvelopeTimestamp(client LogCacheClient, errs chan error, ctx context.Context, sourceID string) time.Time { |
| 175 |
no test coverage detected