MCPcopy
hub / github.com/probelabs/goreplay / startWorker

Method startWorker

output_binary.go:75–110  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

73}
74
75func (o *BinaryOutput) startWorker() {
76 client := NewTCPClient(o.address, &TCPClientConfig{
77 Debug: o.config.Debug,
78 Timeout: o.config.Timeout,
79 ResponseBufferSize: int(o.config.BufferSize),
80 })
81
82 deathCount := 0
83
84 atomic.AddInt64(&o.activeWorkers, 1)
85
86 for {
87 select {
88 case msg := <-o.queue:
89 o.sendRequest(client, msg)
90 deathCount = 0
91 case <-time.After(time.Millisecond * 100):
92 // When dynamic scaling enabled workers die after 2s of inactivity
93 if o.config.Workers == 0 {
94 deathCount++
95 } else {
96 continue
97 }
98
99 if deathCount > 20 {
100 workersCount := atomic.LoadInt64(&o.activeWorkers)
101
102 // At least 1 startWorker should be alive
103 if workersCount != 1 {
104 atomic.AddInt64(&o.activeWorkers, -1)
105 return
106 }
107 }
108 }
109 }
110}
111
112// PluginWrite writes a message tothis plugin
113func (o *BinaryOutput) PluginWrite(msg *Message) (n int, err error) {

Callers 1

workerMasterMethod · 0.95

Calls 2

sendRequestMethod · 0.95
NewTCPClientFunction · 0.85

Tested by

no test coverage detected