(events *ebpf.Map)
| 149 | } |
| 150 | |
| 151 | func initPerfMap(events *ebpf.Map) error { |
| 152 | var err error |
| 153 | eventsReader, err = ringbuf.NewReader(events) |
| 154 | if err != nil { |
| 155 | return err |
| 156 | } |
| 157 | perfChan := make(chan []byte, ebpfCfg.QueueEventsSize) |
| 158 | |
| 159 | for i := 0; i < ebpfCfg.EventsWorkers; i++ { |
| 160 | go streamEventsWorker(i, perfChan, kernelEvents) |
| 161 | } |
| 162 | |
| 163 | go func(perfChan chan []byte, rd *ringbuf.Reader) { |
| 164 | // drainPerfChain drains the channel if it gets full. |
| 165 | // This can happen when there're too much events and the queue size is |
| 166 | // not big enough to hold all the events. |
| 167 | // To prevent blocking the ringbuf channel, we need to discard the events |
| 168 | // from the queue, so the ringbuf can continue sending events from kernel space. |
| 169 | drainPerfChan := func() { |
| 170 | for { |
| 171 | select { |
| 172 | case <-perfChan: |
| 173 | default: |
| 174 | return |
| 175 | } |
| 176 | } |
| 177 | } |
| 178 | |
| 179 | for { |
| 180 | select { |
| 181 | case <-ctxTasks.Done(): |
| 182 | goto Exit |
| 183 | default: |
| 184 | record, err := rd.Read() |
| 185 | if err != nil { |
| 186 | if errors.Is(err, ringbuf.ErrClosed) { |
| 187 | goto Exit |
| 188 | } |
| 189 | // XXX: control max errors? |
| 190 | log.Trace("[eBPF events] reader error: %s", err) |
| 191 | continue |
| 192 | } |
| 193 | |
| 194 | select { |
| 195 | case perfChan <- record.RawSample: |
| 196 | default: |
| 197 | log.Debug("[eBPF] events queue full (%d/%d), ringbuf record lost. Try increasing the queue size and/or the number of workers", len(perfChan), cap(perfChan)) |
| 198 | drainPerfChan() |
| 199 | } |
| 200 | } |
| 201 | } |
| 202 | Exit: |
| 203 | log.Debug("[eBPF events] reader closed") |
| 204 | }(perfChan, eventsReader) |
| 205 | |
| 206 | return nil |
| 207 | } |
| 208 |
no test coverage detected