Event handler for publish forwarder
(self, package)
| 1289 | self.ipc_publisher = ipc_publisher |
| 1290 | |
| 1291 | async def handle_event(self, package): |
| 1292 | """ |
| 1293 | Event handler for publish forwarder |
| 1294 | """ |
| 1295 | tag, data = salt.utils.event.SaltEvent.unpack(package) |
| 1296 | if tag.startswith("salt/job") and tag.endswith("/publish"): |
| 1297 | peer_id = data.pop("__peer_id", None) |
| 1298 | if peer_id: |
| 1299 | data.pop("_stamp", None) |
| 1300 | log.debug( |
| 1301 | "Event monitor forward job to publish server: jid=%s", |
| 1302 | data.get("jid", "no jid"), |
| 1303 | ) |
| 1304 | if not self.channels: |
| 1305 | for transport, opts in iter_transport_opts(self.opts): |
| 1306 | chan = salt.channel.server.PubServerChannel.factory(opts) |
| 1307 | self.channels.append(chan) |
| 1308 | tasks = [] |
| 1309 | for chan in self.channels: |
| 1310 | tasks.append(asyncio.create_task(chan.publish(data))) |
| 1311 | await asyncio.gather(*tasks) |
| 1312 | elif tag.startswith("salt/job") and "/new" in tag: |
| 1313 | # Cluster replication of job submissions: when a peer master |
| 1314 | # publishes a new job, mirror its `minions` list into our |
| 1315 | # local job cache so any CLI on this master can later look |
| 1316 | # up the jid without sharing ``cachedir``. |
| 1317 | # |
| 1318 | # Multi-ring gating: ``owns_for(opts, "jobs", jid)`` |
| 1319 | # consults the cluster-log routing snapshot. No routing |
| 1320 | # entry == broadcast (today's behaviour, every master |
| 1321 | # mirrors). A route to a ring this master hosts defers |
| 1322 | # to that ring's consistent hash; routed-to-a-ring-this- |
| 1323 | # master-is-not-in returns False so non-members no-op |
| 1324 | # the write. Delegate-on-miss: when the drop happens |
| 1325 | # AND we know the ring owner's address, forward the |
| 1326 | # write to that owner so a misconfigured topology |
| 1327 | # doesn't silently lose data. |
| 1328 | peer_id = data.pop("__peer_id", None) |
| 1329 | if peer_id and self.opts.get("cluster_id"): |
| 1330 | jid = data.get("jid") |
| 1331 | minions = data.get("minions") or [] |
| 1332 | if jid and salt.cluster.ring_membership.owns_for( |
| 1333 | self.opts, "jobs", jid |
| 1334 | ): |
| 1335 | try: |
| 1336 | salt.utils.job.store_minions(self.opts, jid, minions) |
| 1337 | except Exception: # pylint: disable=broad-except |
| 1338 | log.exception("Failed to mirror peer job submission %s", jid) |
| 1339 | elif jid: |
| 1340 | self._delegate_on_miss( |
| 1341 | "jobs", jid, "store_minions", {"jid": jid, "minions": minions} |
| 1342 | ) |
| 1343 | elif tag.startswith("salt/job") and "/ret/" in tag: |
| 1344 | # Cluster replication of job returns: minion responded to a |
| 1345 | # peer master; persist the return into our local cache so a |
| 1346 | # CLI on this master can deliver it to the user. |
| 1347 | # |
| 1348 | # Same multi-ring gating as the /new branch above. Once |
nothing calls this directly
no test coverage detected