MCPcopy
hub / github.com/dataelement/Clawith / _tick

Function _tick

backend/app/services/trigger_daemon.py:94–208  ·  view source on GitHub ↗

One daemon tick: evaluate all triggers, group by agent, invoke.

()

Source from the content-addressed store, hash-verified

92# ── Main Tick Loop ──────────────────────────────────────────────────
93
94async def _tick():
95 """One daemon tick: evaluate all triggers, group by agent, invoke."""
96 new_trace_id()
97 now = datetime.now(timezone.utc)
98
99 async with async_session() as db:
100 result = await db.execute(
101 select(AgentTrigger).where(AgentTrigger.is_enabled == True)
102 )
103 all_triggers = result.scalars().all()
104 # Expunge each object before session.close() is called.
105 # session.close() expires all objects still in the identity map;
106 # explicit expunge() detaches them WITHOUT expiry so their scalar
107 # attributes remain readable outside the session context.
108 for _t in all_triggers:
109 db.expunge(_t)
110
111 if not all_triggers:
112 return
113
114
115 # Evaluate and enqueue due triggers. Agent invocation happens only after
116 # executions are claimed through the distributed execution queue.
117 for trigger in all_triggers:
118 # Auto-disable expired triggers
119 if trigger.expires_at and now >= trigger.expires_at:
120 async with async_session() as db:
121 result = await db.execute(select(AgentTrigger).where(AgentTrigger.id == trigger.id))
122 t = result.scalar_one_or_none()
123 if t:
124 t.is_enabled = False
125 await db.commit()
126 continue
127
128 try:
129 if await _evaluate_trigger(trigger, now):
130 handled = await _handle_okr_report_trigger(trigger, now)
131 if not handled:
132 handled = await _handle_okr_collection_trigger(trigger, now)
133 if not handled:
134 # Fix 3: Rate limit on_message triggers per agent
135 if trigger.type == "on_message":
136 agent_fires = _on_msg_fire_log.get(trigger.agent_id, [])
137 cutoff = now - timedelta(seconds=_ON_MSG_RATE_WINDOW)
138 recent = [t for t in agent_fires if t > cutoff]
139 if len(recent) >= _ON_MSG_RATE_LIMIT:
140 logger.warning(
141 f"[A2A Safety] Agent {trigger.agent_id} hit "
142 f"on_message rate limit ({_ON_MSG_RATE_LIMIT}/hr). "
143 f"Auto-disabling trigger '{trigger.name}'."
144 )
145 async with async_session() as db:
146 result = await db.execute(
147 select(AgentTrigger).where(AgentTrigger.id == trigger.id)
148 )
149 t_obj = result.scalar_one_or_none()
150 if t_obj:
151 t_obj.is_enabled = False

Callers 1

start_trigger_daemonFunction · 0.70

Calls 15

new_trace_idFunction · 0.90
enqueue_due_triggerFunction · 0.90
_evaluate_triggerFunction · 0.85
whereMethod · 0.80
expungeMethod · 0.80
executeMethod · 0.45
allMethod · 0.45
scalarsMethod · 0.45

Tested by

no test coverage detected