wmReceiveMessage receives a message with timeout and asserts expected values
(t *testing.T, receiveChan <-chan *Message, expectBucket, expectKey string, timeout time.Duration)
| 76 | |
| 77 | // wmReceiveMessage receives a message with timeout and asserts expected values |
| 78 | func wmReceiveMessage(t *testing.T, receiveChan <-chan *Message, expectBucket, expectKey string, timeout time.Duration) *Message { |
| 79 | select { |
| 80 | case msg, ok := <-receiveChan: |
| 81 | if !ok { |
| 82 | t.Logf("the channel with subscriber id %+v is closed", expectKey) |
| 83 | return nil |
| 84 | } |
| 85 | assert.Equal(t, expectBucket, msg.BucketName) |
| 86 | assert.Equal(t, expectKey, msg.Key) |
| 87 | assert.NotNil(t, msg.Value) |
| 88 | return msg |
| 89 | case <-time.After(timeout): |
| 90 | t.Fatalf("timeout waiting for message on %s/%s", expectBucket, expectKey) |
| 91 | return nil |
| 92 | } |
| 93 | } |
| 94 | |
| 95 | // wmReceiveMessages receives count messages and returns the count received |
| 96 | func wmReceiveMessages(t *testing.T, receiveChan <-chan *Message, expectBucket, expectKey string, expectCount int, timeout time.Duration) int { |
no outgoing calls
no test coverage detected