wmDrainChannel drains all messages from a channel until it's closed or timeout
(t *testing.T, receiveChan <-chan *Message, timeout time.Duration)
| 118 | |
| 119 | // wmDrainChannel drains all messages from a channel until it's closed or timeout |
| 120 | func wmDrainChannel(t *testing.T, receiveChan <-chan *Message, timeout time.Duration) int { |
| 121 | drained := 0 |
| 122 | timeoutChan := time.After(timeout) |
| 123 | |
| 124 | for { |
| 125 | select { |
| 126 | case _, ok := <-receiveChan: |
| 127 | if !ok { |
| 128 | return drained |
| 129 | } |
| 130 | drained++ |
| 131 | case <-timeoutChan: |
| 132 | t.Logf("drain timeout after %d messages", drained) |
| 133 | return drained |
| 134 | } |
| 135 | } |
| 136 | } |
| 137 | |
| 138 | // wmVerifySubscriberExists checks if a subscriber exists in the lookup table |
| 139 | func wmVerifySubscriberExists(t *testing.T, wm *watchManager, bucket, key string, id core.BucketId) { |
no outgoing calls
no test coverage detected