Test event handlers that generate multiple updates. Args: token: A token. mock_base_state_event_processor: The event processor. emitted_deltas: List to capture emitted deltas.
(
token: str,
mock_base_state_event_processor: BaseStateEventProcessor,
emitted_deltas: list,
)
| 1011 | |
| 1012 | @pytest.mark.asyncio |
| 1013 | async def test_process_event_generator( |
| 1014 | token: str, |
| 1015 | mock_base_state_event_processor: BaseStateEventProcessor, |
| 1016 | emitted_deltas: list, |
| 1017 | ): |
| 1018 | """Test event handlers that generate multiple updates. |
| 1019 | |
| 1020 | Args: |
| 1021 | token: A token. |
| 1022 | mock_base_state_event_processor: The event processor. |
| 1023 | emitted_deltas: List to capture emitted deltas. |
| 1024 | """ |
| 1025 | event = Event( |
| 1026 | name=f"{GenState.get_full_name()}.go", |
| 1027 | payload={"c": 5}, |
| 1028 | ) |
| 1029 | async with mock_base_state_event_processor as processor: |
| 1030 | await processor.enqueue(token, event) |
| 1031 | # Generator yields 5 deltas (one per increment). |
| 1032 | assert len(emitted_deltas) == 5 |
| 1033 | for count, (delta_token, delta) in enumerate(emitted_deltas, 1): |
| 1034 | assert delta_token == token |
| 1035 | assert delta == { |
| 1036 | GenState.get_full_name(): {"value" + FIELD_MARKER: count}, |
| 1037 | } |
| 1038 | |
| 1039 | |
| 1040 | def test_get_client_token(test_state, router_data): |
nothing calls this directly
no test coverage detected