MCPcopy
hub / github.com/reflex-dev/reflex / test_process_event_generator

Function test_process_event_generator

tests/units/test_state.py:1013–1037  ·  view source on GitHub ↗

Test event handlers that generate multiple updates. Args: token: A token. mock_base_state_event_processor: The event processor. emitted_deltas: List to capture emitted deltas.

(
    token: str,
    mock_base_state_event_processor: BaseStateEventProcessor,
    emitted_deltas: list,
)

Source from the content-addressed store, hash-verified

1011
1012@pytest.mark.asyncio
1013async def test_process_event_generator(
1014 token: str,
1015 mock_base_state_event_processor: BaseStateEventProcessor,
1016 emitted_deltas: list,
1017):
1018 """Test event handlers that generate multiple updates.
1019
1020 Args:
1021 token: A token.
1022 mock_base_state_event_processor: The event processor.
1023 emitted_deltas: List to capture emitted deltas.
1024 """
1025 event = Event(
1026 name=f"{GenState.get_full_name()}.go",
1027 payload={"c": 5},
1028 )
1029 async with mock_base_state_event_processor as processor:
1030 await processor.enqueue(token, event)
1031 # Generator yields 5 deltas (one per increment).
1032 assert len(emitted_deltas) == 5
1033 for count, (delta_token, delta) in enumerate(emitted_deltas, 1):
1034 assert delta_token == token
1035 assert delta == {
1036 GenState.get_full_name(): {"value" + FIELD_MARKER: count},
1037 }
1038
1039
1040def test_get_client_token(test_state, router_data):

Callers

nothing calls this directly

Calls 3

EventClass · 0.90
get_full_nameMethod · 0.80
enqueueMethod · 0.45

Tested by

no test coverage detected