(self, exchange=None, queue=None, timestamp=True, **callback_kwargs)
| 69 | self.log.error("WebSocket error: {}".format(e), exc_info=True) |
| 70 | |
| 71 | async def fire_event(self, exchange=None, queue=None, timestamp=True, **callback_kwargs): |
| 72 | exchange = exchange or self.default_exchange |
| 73 | queue = queue or self.default_queue |
| 74 | metadata = {} |
| 75 | if timestamp: |
| 76 | metadata.update(dict(timestamp=datetime.now(timezone.utc).timestamp())) |
| 77 | callback_kwargs.update(dict(metadata=metadata)) |
| 78 | uri = '/'.join([self.ws_uri, exchange, queue]) |
| 79 | if self.global_listeners: |
| 80 | asyncio.get_event_loop().create_task(self.notify_global_event_listeners('/'.join([exchange, queue]), |
| 81 | **callback_kwargs)) |
| 82 | d = json.dumps(callback_kwargs) |
| 83 | async with websockets.connect(uri) as websocket: |
| 84 | asyncio.get_event_loop().create_task(self.handle_exceptions(websocket.send(d))) |
| 85 | await asyncio.sleep(0) # yield control to event loop |
| 86 | |
| 87 | |
| 88 | class _Handle: |
no test coverage detected