Take a given load and perform the necessary steps to prepare a publication. TODO: This is really only bound by temporal cohesion and thus should be refactored even further.
(self, minions, jid, clear_load, extra, missing)
| 4172 | threading.Thread(target=self.ssh_client.cmd, kwargs=load).start() |
| 4173 | |
| 4174 | def _prep_pub(self, minions, jid, clear_load, extra, missing): |
| 4175 | """ |
| 4176 | Take a given load and perform the necessary steps |
| 4177 | to prepare a publication. |
| 4178 | |
| 4179 | TODO: This is really only bound by temporal cohesion |
| 4180 | and thus should be refactored even further. |
| 4181 | """ |
| 4182 | clear_load["jid"] = jid |
| 4183 | delimiter = clear_load.get("kwargs", {}).get("delimiter", DEFAULT_TARGET_DELIM) |
| 4184 | |
| 4185 | new_job_load = { |
| 4186 | "jid": clear_load["jid"], |
| 4187 | "tgt_type": clear_load["tgt_type"], |
| 4188 | "tgt": clear_load["tgt"], |
| 4189 | "user": clear_load["user"], |
| 4190 | "fun": clear_load["fun"], |
| 4191 | "arg": clear_load["arg"], |
| 4192 | "minions": minions, |
| 4193 | "missing": missing, |
| 4194 | } |
| 4195 | |
| 4196 | # Announce the job on the event bus |
| 4197 | self.event.fire_event(new_job_load, tagify([clear_load["jid"], "new"], "job")) |
| 4198 | |
| 4199 | if self.opts["ext_job_cache"]: |
| 4200 | fstr = "{}.save_load".format(self.opts["ext_job_cache"]) |
| 4201 | save_load_func = True |
| 4202 | |
| 4203 | # Get the returner's save_load arg_spec. |
| 4204 | try: |
| 4205 | arg_spec = salt.utils.args.get_function_argspec( |
| 4206 | self.mminion.returners[fstr] |
| 4207 | ) |
| 4208 | |
| 4209 | # Check if 'minions' is included in returner's save_load arg_spec. |
| 4210 | # This may be missing in custom returners, which we should warn about. |
| 4211 | if "minions" not in arg_spec.args: |
| 4212 | log.critical( |
| 4213 | "The specified returner used for the external job cache " |
| 4214 | "'%s' does not have a 'minions' kwarg in the returner's " |
| 4215 | "save_load function.", |
| 4216 | self.opts["ext_job_cache"], |
| 4217 | ) |
| 4218 | except (AttributeError, KeyError): |
| 4219 | save_load_func = False |
| 4220 | log.critical( |
| 4221 | "The specified returner used for the external job cache " |
| 4222 | '"%s" does not have a save_load function!', |
| 4223 | self.opts["ext_job_cache"], |
| 4224 | ) |
| 4225 | |
| 4226 | if save_load_func: |
| 4227 | try: |
| 4228 | self.mminion.returners[fstr]( |
| 4229 | clear_load["jid"], clear_load, minions=minions |
| 4230 | ) |
| 4231 | except Exception: # pylint: disable=broad-except |