MCPcopy
hub / github.com/reflex-dev/reflex / test_process_events

Function test_process_events

tests/units/test_app.py:1987–2028  ·  view source on GitHub ↗

Test that an event is processed properly and that it is postprocessed n+1 times. Also check that the processing flag of the last stateupdate is set to False. Args: token: a Token. app_module_mock: The mock for the app module, used to patch the app instance. mock_

(
    token: str,
    app_module_mock: unittest.mock.Mock,
    mock_base_state_event_processor: BaseStateEventProcessor,
    mock_root_event_context: EventContext,
    emitted_deltas: list[tuple[str, dict[str, dict[str, Any]]]],
)

Source from the content-addressed store, hash-verified

1985
1986@pytest.mark.asyncio
1987async def test_process_events(
1988 token: str,
1989 app_module_mock: unittest.mock.Mock,
1990 mock_base_state_event_processor: BaseStateEventProcessor,
1991 mock_root_event_context: EventContext,
1992 emitted_deltas: list[tuple[str, dict[str, dict[str, Any]]]],
1993):
1994 """Test that an event is processed properly and that it is postprocessed
1995 n+1 times. Also check that the processing flag of the last stateupdate is set to
1996 False.
1997
1998 Args:
1999 token: a Token.
2000 app_module_mock: The mock for the app module, used to patch the app instance.
2001 mock_base_state_event_processor: BaseStateEventProcessor Fixture.
2002 mock_root_event_context: The mock for the root event context, used to patch the app
2003 state manager.
2004 emitted_deltas: List to store emitted deltas.
2005 """
2006 event = Event(
2007 name=f"{GenState.get_name()}.go",
2008 payload={"c": 5},
2009 router_data={},
2010 )
2011
2012 async with mock_base_state_event_processor as processor:
2013 await processor.enqueue(
2014 token,
2015 event,
2016 )
2017
2018 if environment.REFLEX_OPLOCK_ENABLED.get():
2019 await mock_root_event_context.state_manager.close()
2020
2021 gen_state = await mock_root_event_context.state_manager.get_state(
2022 BaseStateToken(ident=token, cls=GenState),
2023 )
2024 assert isinstance(gen_state, GenState)
2025 assert gen_state.value == 5
2026 assert len(emitted_deltas) == 5
2027
2028 await mock_root_event_context.state_manager.close()
2029
2030
2031@pytest.fixture

Callers

nothing calls this directly

Calls 7

EventClass · 0.90
BaseStateTokenClass · 0.90
get_nameMethod · 0.80
enqueueMethod · 0.45
getMethod · 0.45
closeMethod · 0.45
get_stateMethod · 0.45

Tested by

no test coverage detected