()
| 180 | } |
| 181 | |
| 182 | func (nq *NetQuery) Start() error { |
| 183 | nq.mgr.Go("netquery connection feed listener", func(ctx *mgr.WorkerCtx) error { |
| 184 | sub, err := nq.db.Subscribe(query.New("network:")) |
| 185 | if err != nil { |
| 186 | return fmt.Errorf("failed to subscribe to network tree: %w", err) |
| 187 | } |
| 188 | defer close(nq.feed) |
| 189 | defer func() { |
| 190 | _ = sub.Cancel() |
| 191 | }() |
| 192 | |
| 193 | for { |
| 194 | select { |
| 195 | case <-ctx.Done(): |
| 196 | return nil |
| 197 | case rec, ok := <-sub.Feed: |
| 198 | if !ok { |
| 199 | return nil |
| 200 | } |
| 201 | |
| 202 | conn, ok := rec.(*network.Connection) |
| 203 | if !ok { |
| 204 | // This is fine as we also receive process updates on |
| 205 | // this channel. |
| 206 | continue |
| 207 | } |
| 208 | |
| 209 | nq.feed <- conn |
| 210 | } |
| 211 | } |
| 212 | }) |
| 213 | |
| 214 | nq.mgr.Go("netquery connection feed handler", func(ctx *mgr.WorkerCtx) error { |
| 215 | nq.mng.HandleFeed(ctx.Ctx(), nq.feed) |
| 216 | return nil |
| 217 | }) |
| 218 | |
| 219 | nq.mgr.Go("netquery live db cleaner", func(ctx *mgr.WorkerCtx) error { |
| 220 | for { |
| 221 | select { |
| 222 | case <-ctx.Done(): |
| 223 | return nil |
| 224 | case <-time.After(10 * time.Second): |
| 225 | threshold := time.Now().Add(-network.DeleteConnsAfterEndedThreshold) |
| 226 | count, err := nq.Store.Cleanup(ctx.Ctx(), threshold) |
| 227 | if err != nil { |
| 228 | log.Errorf("netquery: failed to removed old connections from live db: %s", err) |
| 229 | } else { |
| 230 | log.Tracef("netquery: successfully removed %d old connections from live db that ended before %s", count, threshold) |
| 231 | } |
| 232 | } |
| 233 | } |
| 234 | }) |
| 235 | |
| 236 | nq.mgr.Delay("network history cleaner delay", 10*time.Minute, func(w *mgr.WorkerCtx) error { |
| 237 | return nq.Store.CleanupHistory(w.Ctx()) |
| 238 | }).Repeat(1 * time.Hour) |
| 239 |
nothing calls this directly
no test coverage detected