This method should be used as a threading target, start the actual minion side execution.
(cls, minion_instance, opts, data)
| 2841 | |
| 2842 | @classmethod |
| 2843 | def _thread_return(cls, minion_instance, opts, data): |
| 2844 | """ |
| 2845 | This method should be used as a threading target, start the actual |
| 2846 | minion side execution. |
| 2847 | """ |
| 2848 | loop = asyncio.new_event_loop() |
| 2849 | asyncio.set_event_loop(loop) |
| 2850 | |
| 2851 | salt.utils.tracing.configure(opts) |
| 2852 | salt.utils.metrics.configure({**opts, "__role": "minion"}) |
| 2853 | _exec_trace_ctx = ( |
| 2854 | salt.utils.tracing.extract(data) if isinstance(data, dict) else None |
| 2855 | ) |
| 2856 | _exec_fun = data.get("fun", "") if isinstance(data, dict) else "" |
| 2857 | _exec_span_cm = salt.utils.tracing.start_span( |
| 2858 | f"salt.minion.exec.{_exec_fun}" if _exec_fun else "salt.minion.exec", |
| 2859 | attributes={ |
| 2860 | "salt.fun": _exec_fun, |
| 2861 | "salt.jid": str(data.get("jid", "")) if isinstance(data, dict) else "", |
| 2862 | }, |
| 2863 | context=_exec_trace_ctx, |
| 2864 | ) |
| 2865 | # The span needs to outlive the existing try/finally below, which |
| 2866 | # makes a plain ``with`` block awkward; enter / exit manually. |
| 2867 | _exec_span_cm.__enter__() # pylint: disable=unnecessary-dunder-call |
| 2868 | _exec_span_exit_called = False |
| 2869 | # Companion histogram for the same wall-clock window the exec span |
| 2870 | # covers; recorded in the finally below. |
| 2871 | _exec_perf_start = time.perf_counter() |
| 2872 | |
| 2873 | minion_instance.gen_modules() |
| 2874 | |
| 2875 | fn_ = os.path.join(minion_instance.proc_dir, str(data["jid"])) |
| 2876 | |
| 2877 | try: |
| 2878 | if opts.get("multiprocessing", True): |
| 2879 | salt.utils.process.appendproctitle(f"{cls.__name__}._thread_return") |
| 2880 | |
| 2881 | sdata = {"pid": os.getpid()} |
| 2882 | sdata.update(data) |
| 2883 | log.info("Starting a new job %s with PID %s", data["jid"], sdata["pid"]) |
| 2884 | with salt.utils.files.fopen(fn_, "w+b") as fp_: |
| 2885 | fp_.write(salt.payload.dumps(sdata)) |
| 2886 | if data.get("start_event"): |
| 2887 | minion_instance._fire_start_event(data) |
| 2888 | ret = {"success": False} |
| 2889 | function_name = data["fun"] |
| 2890 | function_args = data["arg"] |
| 2891 | executors = ( |
| 2892 | data.get("module_executors") |
| 2893 | or getattr(minion_instance, "module_executors", []) |
| 2894 | or opts.get("module_executors", ["direct_call"]) |
| 2895 | ) |
| 2896 | allow_missing_funcs = any( |
| 2897 | [ |
| 2898 | minion_instance.executors[f"{executor}.allow_missing_func"]( |
| 2899 | function_name |
| 2900 | ) |