Send a single event into the publisher with payload dict "data" and event identifier "tag" The default is 1000 ms
(self, data, tag, timeout=1000)
| 891 | cb(None) |
| 892 | |
| 893 | def fire_event(self, data, tag, timeout=1000): |
| 894 | """ |
| 895 | Send a single event into the publisher with payload dict "data" and |
| 896 | event identifier "tag" |
| 897 | |
| 898 | The default is 1000 ms |
| 899 | """ |
| 900 | if self.opts.get("subproxy", False): |
| 901 | data["proxy_target"] = self.opts["id"] |
| 902 | |
| 903 | if not str(tag): # no empty tags allowed |
| 904 | raise ValueError("Empty tag.") |
| 905 | |
| 906 | if not isinstance(data, MutableMapping): # data must be dict |
| 907 | raise ValueError(f"Dict object expected, not '{data}'.") |
| 908 | |
| 909 | if not self.cpush: |
| 910 | if timeout is not None: |
| 911 | timeout_s = float(timeout) / 1000 |
| 912 | else: |
| 913 | timeout_s = None |
| 914 | if not self.connect_pull(timeout=timeout_s): |
| 915 | return False |
| 916 | |
| 917 | data["_stamp"] = datetime.datetime.now(datetime.timezone.utc).isoformat() |
| 918 | salt.utils.tracing.inject(data) |
| 919 | salt.utils.metrics.counter( |
| 920 | "salt.events.fired", |
| 921 | description="Events placed on the salt event bus.", |
| 922 | ).add(1, attributes={"tag_prefix": _event_tag_prefix(tag)}) |
| 923 | event = self.pack(tag, data, max_size=self.opts["max_event_size"]) |
| 924 | msg = salt.utils.stringutils.to_bytes(event, "utf-8") |
| 925 | if self._run_io_loop_sync: |
| 926 | try: |
| 927 | self.pusher.publish(msg) |
| 928 | except Exception as exc: # pylint: disable=broad-except |
| 929 | log.debug( |
| 930 | "Publisher send failed with exception: %s", |
| 931 | exc, |
| 932 | exc_info_on_loglevel=logging.DEBUG, |
| 933 | ) |
| 934 | raise |
| 935 | else: |
| 936 | task = self.io_loop.create_task(self.pusher.publish(msg)) |
| 937 | self._publish_tasks.add(task) |
| 938 | task.add_done_callback(self._publish_tasks.discard) |
| 939 | return True |
| 940 | |
| 941 | def fire_master(self, data, tag, timeout=1000): |
| 942 | """' |