MCPcopy
hub / github.com/apache/caldera / fire_event

Method fire_event

app/service/event_svc.py:71–85  ·  view source on GitHub ↗
(self, exchange=None, queue=None, timestamp=True, **callback_kwargs)

Source from the content-addressed store, hash-verified

69 self.log.error("WebSocket error: {}".format(e), exc_info=True)
70
71 async def fire_event(self, exchange=None, queue=None, timestamp=True, **callback_kwargs):
72 exchange = exchange or self.default_exchange
73 queue = queue or self.default_queue
74 metadata = {}
75 if timestamp:
76 metadata.update(dict(timestamp=datetime.now(timezone.utc).timestamp()))
77 callback_kwargs.update(dict(metadata=metadata))
78 uri = '/'.join([self.ws_uri, exchange, queue])
79 if self.global_listeners:
80 asyncio.get_event_loop().create_task(self.notify_global_event_listeners('/'.join([exchange, queue]),
81 **callback_kwargs))
82 d = json.dumps(callback_kwargs)
83 async with websockets.connect(uri) as websocket:
84 asyncio.get_event_loop().create_task(self.handle_exceptions(websocket.send(d)))
85 await asyncio.sleep(0) # yield control to event loop
86
87
88class _Handle:

Callers 8

run_tasksFunction · 0.45
closeMethod · 0.45
add_factMethod · 0.45
update_factMethod · 0.45
handle_heartbeatMethod · 0.45

Calls 4

handle_exceptionsMethod · 0.95
updateMethod · 0.80
sendMethod · 0.80

Tested by

no test coverage detected