Generator function that yields SSE-formatted messages
()
| 57 | """Basic SSE endpoint that sends a message every second""" |
| 58 | |
| 59 | def event_generator(): |
| 60 | """Generator function that yields SSE-formatted messages""" |
| 61 | for i in range(10): |
| 62 | yield SSEMessage(f"Message {i} - {time.strftime('%H:%M:%S')}", id=str(i)) |
| 63 | time.sleep(1) |
| 64 | |
| 65 | # Send a final message |
| 66 | yield SSEMessage("Stream ended", event="end") |
| 67 | |
| 68 | return SSEResponse(event_generator()) |
| 69 |
no test coverage detected