MCPcopy
hub / github.com/saltstack/salt / handle_event

Method handle_event

salt/master.py:1291–1404  ·  view source on GitHub ↗

Event handler for publish forwarder

(self, package)

Source from the content-addressed store, hash-verified

1289 self.ipc_publisher = ipc_publisher
1290
1291 async def handle_event(self, package):
1292 """
1293 Event handler for publish forwarder
1294 """
1295 tag, data = salt.utils.event.SaltEvent.unpack(package)
1296 if tag.startswith("salt/job") and tag.endswith("/publish"):
1297 peer_id = data.pop("__peer_id", None)
1298 if peer_id:
1299 data.pop("_stamp", None)
1300 log.debug(
1301 "Event monitor forward job to publish server: jid=%s",
1302 data.get("jid", "no jid"),
1303 )
1304 if not self.channels:
1305 for transport, opts in iter_transport_opts(self.opts):
1306 chan = salt.channel.server.PubServerChannel.factory(opts)
1307 self.channels.append(chan)
1308 tasks = []
1309 for chan in self.channels:
1310 tasks.append(asyncio.create_task(chan.publish(data)))
1311 await asyncio.gather(*tasks)
1312 elif tag.startswith("salt/job") and "/new" in tag:
1313 # Cluster replication of job submissions: when a peer master
1314 # publishes a new job, mirror its `minions` list into our
1315 # local job cache so any CLI on this master can later look
1316 # up the jid without sharing ``cachedir``.
1317 #
1318 # Multi-ring gating: ``owns_for(opts, "jobs", jid)``
1319 # consults the cluster-log routing snapshot. No routing
1320 # entry == broadcast (today's behaviour, every master
1321 # mirrors). A route to a ring this master hosts defers
1322 # to that ring's consistent hash; routed-to-a-ring-this-
1323 # master-is-not-in returns False so non-members no-op
1324 # the write. Delegate-on-miss: when the drop happens
1325 # AND we know the ring owner's address, forward the
1326 # write to that owner so a misconfigured topology
1327 # doesn't silently lose data.
1328 peer_id = data.pop("__peer_id", None)
1329 if peer_id and self.opts.get("cluster_id"):
1330 jid = data.get("jid")
1331 minions = data.get("minions") or []
1332 if jid and salt.cluster.ring_membership.owns_for(
1333 self.opts, "jobs", jid
1334 ):
1335 try:
1336 salt.utils.job.store_minions(self.opts, jid, minions)
1337 except Exception: # pylint: disable=broad-except
1338 log.exception("Failed to mirror peer job submission %s", jid)
1339 elif jid:
1340 self._delegate_on_miss(
1341 "jobs", jid, "store_minions", {"jid": jid, "minions": minions}
1342 )
1343 elif tag.startswith("salt/job") and "/ret/" in tag:
1344 # Cluster replication of job returns: minion responded to a
1345 # peer master; persist the return into our local cache so a
1346 # CLI on this master can deliver it to the user.
1347 #
1348 # Same multi-ring gating as the /new branch above. Once

Callers

nothing calls this directly

Calls 13

_delegate_on_missMethod · 0.95
iter_transport_optsFunction · 0.90
unpackMethod · 0.80
debugMethod · 0.80
store_jobMethod · 0.80
rotate_cluster_secretMethod · 0.80
traceMethod · 0.80
popMethod · 0.45
getMethod · 0.45
factoryMethod · 0.45
appendMethod · 0.45

Tested by

no test coverage detected