Generator that yields named SSE events
()
| 88 | """SSE endpoint with named events""" |
| 89 | |
| 90 | def named_event_generator(): |
| 91 | """Generator that yields named SSE events""" |
| 92 | events = [ |
| 93 | ("user_joined", "Alice joined the chat"), |
| 94 | ("message", "Hello everyone!"), |
| 95 | ("user_left", "Bob left the chat"), |
| 96 | ("message", "How is everyone doing?"), |
| 97 | ("user_joined", "Charlie joined the chat"), |
| 98 | ] |
| 99 | |
| 100 | for i, (event_type, message) in enumerate(events): |
| 101 | yield SSEMessage(message, event=event_type, id=str(i)) |
| 102 | time.sleep(1.5) |
| 103 | |
| 104 | return SSEResponse(named_event_generator()) |
| 105 |
no test coverage detected