MCPcopy
hub / github.com/sparckles/Robyn / named_event_generator

Function named_event_generator

examples/sse_example.py:90–102  ·  view source on GitHub ↗

Generator that yields named SSE events

()

Source from the content-addressed store, hash-verified

88 """SSE endpoint with named events"""
89
90 def named_event_generator():
91 """Generator that yields named SSE events"""
92 events = [
93 ("user_joined", "Alice joined the chat"),
94 ("message", "Hello everyone!"),
95 ("user_left", "Bob left the chat"),
96 ("message", "How is everyone doing?"),
97 ("user_joined", "Charlie joined the chat"),
98 ]
99
100 for i, (event_type, message) in enumerate(events):
101 yield SSEMessage(message, event=event_type, id=str(i))
102 time.sleep(1.5)
103
104 return SSEResponse(named_event_generator())
105

Callers 1

stream_named_eventsFunction · 0.85

Calls 1

SSEMessageFunction · 0.90

Tested by

no test coverage detected