()
| 459 | } |
| 460 | |
| 461 | func (router *WshRouter) processOneBacklogRound() { |
| 462 | router.lock.Lock() |
| 463 | defer router.lock.Unlock() |
| 464 | for linkId, backlog := range router.linkMsgBacklog { |
| 465 | lm := router.linkMap[linkId] |
| 466 | if lm == nil { |
| 467 | highWater := router.backlogHighWaterMark[linkId] |
| 468 | if highWater > 0 { |
| 469 | log.Printf("[router] backlog for linkid=%d cleared, link gone (highwater mark was %d messages)\n", linkId, highWater) |
| 470 | } |
| 471 | delete(router.linkMsgBacklog, linkId) |
| 472 | delete(router.backlogHighWaterMark, linkId) |
| 473 | continue |
| 474 | } |
| 475 | newBacklog := router.drainLinkBacklog_withLock(linkId, lm, backlog) |
| 476 | if len(newBacklog) == 0 { |
| 477 | highWater := router.backlogHighWaterMark[linkId] |
| 478 | if highWater > 0 { |
| 479 | log.Printf("[router] backlog for linkid=%d cleared (highwater mark was %d messages)\n", linkId, highWater) |
| 480 | } |
| 481 | delete(router.linkMsgBacklog, linkId) |
| 482 | delete(router.backlogHighWaterMark, linkId) |
| 483 | continue |
| 484 | } |
| 485 | router.linkMsgBacklog[linkId] = newBacklog |
| 486 | } |
| 487 | } |
| 488 | |
| 489 | func (router *WshRouter) processBacklog() { |
| 490 | defer func() { |
no test coverage detected