MCPcopy
hub / github.com/reflex-dev/reflex / state_manager

Function state_manager

tests/units/conftest.py:230–260  ·  view source on GitHub ↗

Instance of state manager parametrized for redis and in-process. Args: request: pytest request object. mock_root_event_context: The mock root event context to use for the state manager. Yields: A state manager instance

(
    request: pytest.FixtureRequest, mock_root_event_context: EventContext
)

Source from the content-addressed store, hash-verified

228 loop_scope="function", scope="function", params=["in_process", "disk", "redis"]
229)
230async def state_manager(
231 request: pytest.FixtureRequest, mock_root_event_context: EventContext
232) -> AsyncGenerator[StateManager, None]:
233 """Instance of state manager parametrized for redis and in-process.
234
235 Args:
236 request: pytest request object.
237 mock_root_event_context: The mock root event context to use for the state manager.
238
239 Yields:
240 A state manager instance
241 """
242 state_manager = StateManager.create()
243 if request.param == "redis":
244 if not isinstance(state_manager, StateManagerRedis):
245 state_manager = StateManagerRedis(redis=mock_redis())
246 elif request.param == "disk":
247 # explicitly NOT using redis
248 state_manager = StateManagerDisk()
249 assert not state_manager._states_locks
250 else:
251 state_manager = StateManagerMemory()
252 assert not state_manager._states_locks
253
254 orig_state_manager = mock_root_event_context.state_manager
255 object.__setattr__(mock_root_event_context, "state_manager", state_manager)
256
257 yield state_manager
258
259 await state_manager.close()
260 object.__setattr__(mock_root_event_context, "state_manager", orig_state_manager)
261
262
263@pytest.fixture

Callers

nothing calls this directly

Calls 7

closeMethod · 0.95
StateManagerRedisClass · 0.90
mock_redisFunction · 0.90
StateManagerDiskClass · 0.90
StateManagerMemoryClass · 0.90
createMethod · 0.45
__setattr__Method · 0.45

Tested by

no test coverage detected