| 9 | |
| 10 | |
| 11 | class EventService(EventServiceInterface, BaseService): |
| 12 | |
| 13 | def __init__(self): |
| 14 | self.log = self.add_service('event_svc', self) |
| 15 | self.contact_svc = self.get_service('contact_svc') |
| 16 | self.ws_uri = 'ws://{}'.format(self.get_config('app.contact.websocket')) |
| 17 | self.global_listeners = [] |
| 18 | self.default_exchange = 'caldera' |
| 19 | self.default_queue = 'general' |
| 20 | |
| 21 | async def observe_event(self, callback, exchange=None, queue=None): |
| 22 | """ |
| 23 | Register a callback for a certain event. Callback is fired when |
| 24 | an event of that type is observed. |
| 25 | |
| 26 | :param callback: Callback function |
| 27 | :type callback: function |
| 28 | :param exchange: event exchange |
| 29 | :type exchange: str |
| 30 | :param queue: event queue |
| 31 | :type queue: str |
| 32 | """ |
| 33 | exchange = exchange or self.default_exchange |
| 34 | queue = queue or self.default_queue |
| 35 | path = '/'.join([exchange, queue]) |
| 36 | handle = _Handle(path, callback) |
| 37 | ws_contact = await self.contact_svc.get_contact('websocket') |
| 38 | ws_contact.handler.handles.append(handle) |
| 39 | |
| 40 | async def register_global_event_listener(self, callback): |
| 41 | """ |
| 42 | Register a global event listener that is fired when any event |
| 43 | is fired. |
| 44 | |
| 45 | :param callback: Callback function |
| 46 | :type callback: function |
| 47 | """ |
| 48 | self.global_listeners.append(callback) |
| 49 | |
| 50 | async def notify_global_event_listeners(self, event, **callback_kwargs): |
| 51 | """ |
| 52 | Notify all registered global event listeners when an event is fired. |
| 53 | |
| 54 | :param event: Event string (i.e. '<exchange>/<queue>') |
| 55 | :type event: str |
| 56 | """ |
| 57 | for c in self.global_listeners: |
| 58 | try: |
| 59 | c(event, **callback_kwargs) |
| 60 | except Exception as e: |
| 61 | self.log.error("Global callback error: {}".format(e), exc_info=True) |
| 62 | |
| 63 | async def handle_exceptions(self, awaitable): |
| 64 | try: |
| 65 | return await awaitable |
| 66 | except websockets.exceptions.ConnectionClosedOK: |
| 67 | pass # No handler was registered for this event |
| 68 | except Exception as e: |
no outgoing calls