MCPcopy
hub / github.com/reflex-dev/reflex / test_state_proxy

Function test_state_proxy

tests/units/test_state.py:2293–2415  ·  view source on GitHub ↗

Test that the state proxy works. Args: grandchild_state: A grandchild state. token: A token. attached_mock_base_state_event_processor: The event processor attached for this test. emitted_deltas: A list to capture emitted deltas. attached_mock_event_contex

(
    grandchild_state: GrandchildState,
    token: str,
    attached_mock_base_state_event_processor: BaseStateEventProcessor,
    emitted_deltas: list[tuple[str, Mapping[str, Mapping[str, Any]]]],
    attached_mock_event_context: EventContext,
)

Source from the content-addressed store, hash-verified

2291
2292@pytest.mark.asyncio
2293async def test_state_proxy(
2294 grandchild_state: GrandchildState,
2295 token: str,
2296 attached_mock_base_state_event_processor: BaseStateEventProcessor,
2297 emitted_deltas: list[tuple[str, Mapping[str, Mapping[str, Any]]]],
2298 attached_mock_event_context: EventContext,
2299):
2300 """Test that the state proxy works.
2301
2302 Args:
2303 grandchild_state: A grandchild state.
2304 token: A token.
2305 attached_mock_base_state_event_processor: The event processor attached for this test.
2306 emitted_deltas: A list to capture emitted deltas.
2307 attached_mock_event_context: The event context attached for this test.
2308 """
2309 child_state = grandchild_state.parent_state
2310 assert child_state is not None
2311 parent_state = child_state.parent_state
2312 assert parent_state is not None
2313 router_data = RouterData.from_router_data({
2314 "query": {},
2315 "token": token,
2316 "sid": "test_sid",
2317 })
2318 grandchild_state.router = router_data
2319 state_manager = attached_mock_event_context.state_manager
2320 if isinstance(state_manager, (StateManagerMemory, StateManagerDisk)):
2321 state_manager.states[parent_state.router.session.client_token] = parent_state
2322 elif isinstance(state_manager, StateManagerRedis):
2323 pickle_state = parent_state._serialize()
2324 if pickle_state:
2325 await state_manager.redis.set(
2326 str(
2327 BaseStateToken(
2328 ident=parent_state.router.session.client_token,
2329 cls=type(parent_state),
2330 )
2331 ),
2332 pickle_state,
2333 ex=state_manager.token_expiration,
2334 )
2335
2336 sp = StateProxy(grandchild_state)
2337 assert sp.__wrapped__ == grandchild_state
2338 assert sp._self_substate_path == tuple(grandchild_state.get_full_name().split("."))
2339 assert not sp._self_mutable
2340 assert sp._self_actx is None
2341
2342 # cannot use normal contextmanager protocol
2343 with pytest.raises(TypeError), sp:
2344 pass
2345
2346 with pytest.raises(ImmutableStateError):
2347 # cannot directly modify state proxy outside of async context
2348 sp.value2 = "16"
2349
2350 with pytest.raises(ImmutableStateError):

Callers

nothing calls this directly

Calls 15

get_stateMethod · 0.95
get_substateMethod · 0.95
BaseStateTokenClass · 0.90
StateProxyClass · 0.90
_serializeMethod · 0.80
splitMethod · 0.80
get_full_nameMethod · 0.80
get_nameMethod · 0.80
from_router_dataMethod · 0.45
setMethod · 0.45
getMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected