MCPcopy Index your code
hub / github.com/smittix/intercept / enqueue

Method enqueue

intercept_agent.py:257–274  ·  view source on GitHub ↗

Add data to push queue.

(self, scan_type: str, payload: dict, interface: str = None)

Source from the content-addressed store, hash-verified

255 self.stop_event = threading.Event()
256
257 def enqueue(self, scan_type: str, payload: dict, interface: str = None):
258 """Add data to push queue."""
259 if not self.cfg.push_enabled or not self.cfg.controller_url:
260 return
261
262 item = {
263 'agent_name': self.cfg.name,
264 'scan_type': scan_type,
265 'interface': interface,
266 'payload': payload,
267 'received_at': datetime.now(timezone.utc).isoformat(),
268 'attempts': 0,
269 }
270
271 try:
272 self.queue.put_nowait(item)
273 except queue.Full:
274 logger.warning("Push queue full, dropping payload")
275
276 def run(self):
277 """Main push loop."""

Callers 4

runMethod · 0.45
flush_pending_updatesFunction · 0.45
parse_sbs_streamFunction · 0.45
enqueue_manyMethod · 0.45

Calls 1

put_nowaitMethod · 0.80

Tested by 1

enqueue_manyMethod · 0.36