MCPcopy
hub / github.com/sparckles/Robyn / async_event_generator

Function async_event_generator

examples/sse_example.py:111–116  ·  view source on GitHub ↗

Async generator for SSE events

()

Source from the content-addressed store, hash-verified

109 """Async SSE endpoint demonstrating async generators"""
110
111 async def async_event_generator():
112 """Async generator for SSE events"""
113 for i in range(8):
114 # Simulate async work
115 await asyncio.sleep(0.5)
116 yield SSEMessage(f"Async message {i} - {time.strftime('%H:%M:%S')}", event="async", id=str(i))
117
118 return SSEResponse(async_event_generator())
119

Callers 1

stream_async_eventsFunction · 0.70

Calls 1

SSEMessageFunction · 0.90

Tested by

no test coverage detected