()
| 73 | } |
| 74 | |
| 75 | func (o *BinaryOutput) startWorker() { |
| 76 | client := NewTCPClient(o.address, &TCPClientConfig{ |
| 77 | Debug: o.config.Debug, |
| 78 | Timeout: o.config.Timeout, |
| 79 | ResponseBufferSize: int(o.config.BufferSize), |
| 80 | }) |
| 81 | |
| 82 | deathCount := 0 |
| 83 | |
| 84 | atomic.AddInt64(&o.activeWorkers, 1) |
| 85 | |
| 86 | for { |
| 87 | select { |
| 88 | case msg := <-o.queue: |
| 89 | o.sendRequest(client, msg) |
| 90 | deathCount = 0 |
| 91 | case <-time.After(time.Millisecond * 100): |
| 92 | // When dynamic scaling enabled workers die after 2s of inactivity |
| 93 | if o.config.Workers == 0 { |
| 94 | deathCount++ |
| 95 | } else { |
| 96 | continue |
| 97 | } |
| 98 | |
| 99 | if deathCount > 20 { |
| 100 | workersCount := atomic.LoadInt64(&o.activeWorkers) |
| 101 | |
| 102 | // At least 1 startWorker should be alive |
| 103 | if workersCount != 1 { |
| 104 | atomic.AddInt64(&o.activeWorkers, -1) |
| 105 | return |
| 106 | } |
| 107 | } |
| 108 | } |
| 109 | } |
| 110 | } |
| 111 | |
| 112 | // PluginWrite writes a message tothis plugin |
| 113 | func (o *BinaryOutput) PluginWrite(msg *Message) (n int, err error) { |
no test coverage detected