(ev *Event, sub *subscriber)
| 197 | } |
| 198 | |
| 199 | func sendEvent(ev *Event, sub *subscriber) { |
| 200 | go func(s *subscriber) { |
| 201 | evCopy := *ev |
| 202 | if s.autoAck { |
| 203 | s.Channel <- evCopy |
| 204 | return |
| 205 | } |
| 206 | evCopy.SetAckFunc(ackFunc(s, evCopy)) |
| 207 | evCopy.SetNackFunc(nackFunc(s, evCopy)) |
| 208 | s.Lock() |
| 209 | s.retryMap[evCopy.ID] = 0 |
| 210 | s.Unlock() |
| 211 | tick := time.NewTicker(s.ackWait) |
| 212 | defer tick.Stop() |
| 213 | for range tick.C { |
| 214 | s.Lock() |
| 215 | count, ok := s.retryMap[evCopy.ID] |
| 216 | s.Unlock() |
| 217 | if !ok { |
| 218 | // success |
| 219 | break |
| 220 | } |
| 221 | |
| 222 | if s.retryLimit > -1 && count > s.retryLimit { |
| 223 | if logger.V(logger.ErrorLevel, logger.DefaultLogger) { |
| 224 | logger.Errorf("Message retry limit reached, discarding: %v %d %d", evCopy.ID, count, s.retryLimit) |
| 225 | } |
| 226 | s.Lock() |
| 227 | delete(s.retryMap, evCopy.ID) |
| 228 | s.Unlock() |
| 229 | return |
| 230 | } |
| 231 | s.Channel <- evCopy |
| 232 | s.Lock() |
| 233 | s.retryMap[evCopy.ID] = count + 1 |
| 234 | s.Unlock() |
| 235 | } |
| 236 | }(sub) |
| 237 | } |
| 238 | |
| 239 | func ackFunc(s *subscriber, evCopy Event) func() error { |
| 240 | return func() error { |
no test coverage detected
searching dependent graphs…