Send a single event into the publisher with payload dict "data" and event identifier "tag" The default is 1000 ms
(self, data, tag, cb=None, timeout=1000)
| 847 | yield data |
| 848 | |
| 849 | async def fire_event_async(self, data, tag, cb=None, timeout=1000): |
| 850 | """ |
| 851 | Send a single event into the publisher with payload dict "data" and |
| 852 | event identifier "tag" |
| 853 | |
| 854 | The default is 1000 ms |
| 855 | """ |
| 856 | if self.opts.get("subproxy", False): |
| 857 | data["proxy_target"] = self.opts["id"] |
| 858 | |
| 859 | if not str(tag): # no empty tags allowed |
| 860 | raise ValueError("Empty tag.") |
| 861 | |
| 862 | if not isinstance(data, MutableMapping): # data must be dict |
| 863 | raise ValueError(f"Dict object expected, not '{data}'.") |
| 864 | |
| 865 | if not self.cpush: |
| 866 | if timeout is not None: |
| 867 | timeout_s = float(timeout) / 1000 |
| 868 | else: |
| 869 | timeout_s = None |
| 870 | if not self.connect_pull(timeout=timeout_s): |
| 871 | return False |
| 872 | |
| 873 | data["_stamp"] = datetime.datetime.now(datetime.timezone.utc).isoformat() |
| 874 | salt.utils.tracing.inject(data) |
| 875 | salt.utils.metrics.counter( |
| 876 | "salt.events.fired", |
| 877 | description="Events placed on the salt event bus.", |
| 878 | ).add(1, attributes={"tag_prefix": _event_tag_prefix(tag)}) |
| 879 | event = self.pack(tag, data, max_size=self.opts["max_event_size"]) |
| 880 | msg = salt.utils.stringutils.to_bytes(event, "utf-8") |
| 881 | if self._run_io_loop_sync: |
| 882 | # pusher is a SyncWrapper; publish() runs synchronously. |
| 883 | self.pusher.publish(msg) |
| 884 | else: |
| 885 | await self.pusher.publish(msg) |
| 886 | if cb is not None: |
| 887 | warn_until( |
| 888 | 3009, |
| 889 | "The cb argument to fire_event_async will be removed in 3009", |
| 890 | ) |
| 891 | cb(None) |
| 892 | |
| 893 | def fire_event(self, data, tag, timeout=1000): |
| 894 | """ |
no test coverage detected