Process one incoming event. This method is overridden in subclasses to handle the handshake.
(self, event: Event)
| 692 | # Private methods |
| 693 | |
| 694 | def process_event(self, event: Event) -> None: |
| 695 | """ |
| 696 | Process one incoming event. |
| 697 | |
| 698 | This method is overridden in subclasses to handle the handshake. |
| 699 | |
| 700 | """ |
| 701 | assert isinstance(event, Frame) |
| 702 | if event.opcode in DATA_OPCODES: |
| 703 | self.recv_messages.put(event) |
| 704 | |
| 705 | if event.opcode is Opcode.PONG: |
| 706 | self.acknowledge_pings(bytes(event.data)) |
| 707 | |
| 708 | def acknowledge_pings(self, data: bytes) -> None: |
| 709 | """ |
no test coverage detected