()
| 228 | } |
| 229 | |
| 230 | func (q *EventQueue) run() { |
| 231 | q.eventQueueOnce.Do(func() { |
| 232 | defer close(q.eventsClosed) |
| 233 | for ev := range q.events { |
| 234 | select { |
| 235 | case <-q.drain: |
| 236 | ev.stats.waitConsumeOffQueue.End(false) |
| 237 | close(ev.cancelled) |
| 238 | close(ev.eventResults) |
| 239 | ev.printStats(q) |
| 240 | default: |
| 241 | ev.stats.waitConsumeOffQueue.End(true) |
| 242 | ev.stats.durationStat.Start() |
| 243 | ev.Metadata.Handle(ev.eventResults) |
| 244 | // Always indicate success for now. |
| 245 | ev.stats.durationStat.End(true) |
| 246 | // Ensures that no more results can be sent as the event has |
| 247 | // already been processed. |
| 248 | ev.printStats(q) |
| 249 | close(ev.eventResults) |
| 250 | } |
| 251 | } |
| 252 | }) |
| 253 | } |
| 254 | |
| 255 | func (q *EventQueue) notSafeToAccess() bool { |
| 256 | return q == nil || q.close == nil || q.drain == nil || q.events == nil |
no test coverage detected