Instance of state manager parametrized for redis and in-process. Args: request: pytest request object. mock_root_event_context: The mock root event context to use for the state manager. Yields: A state manager instance
(
request: pytest.FixtureRequest, mock_root_event_context: EventContext
)
| 228 | loop_scope="function", scope="function", params=["in_process", "disk", "redis"] |
| 229 | ) |
| 230 | async def state_manager( |
| 231 | request: pytest.FixtureRequest, mock_root_event_context: EventContext |
| 232 | ) -> AsyncGenerator[StateManager, None]: |
| 233 | """Instance of state manager parametrized for redis and in-process. |
| 234 | |
| 235 | Args: |
| 236 | request: pytest request object. |
| 237 | mock_root_event_context: The mock root event context to use for the state manager. |
| 238 | |
| 239 | Yields: |
| 240 | A state manager instance |
| 241 | """ |
| 242 | state_manager = StateManager.create() |
| 243 | if request.param == "redis": |
| 244 | if not isinstance(state_manager, StateManagerRedis): |
| 245 | state_manager = StateManagerRedis(redis=mock_redis()) |
| 246 | elif request.param == "disk": |
| 247 | # explicitly NOT using redis |
| 248 | state_manager = StateManagerDisk() |
| 249 | assert not state_manager._states_locks |
| 250 | else: |
| 251 | state_manager = StateManagerMemory() |
| 252 | assert not state_manager._states_locks |
| 253 | |
| 254 | orig_state_manager = mock_root_event_context.state_manager |
| 255 | object.__setattr__(mock_root_event_context, "state_manager", state_manager) |
| 256 | |
| 257 | yield state_manager |
| 258 | |
| 259 | await state_manager.close() |
| 260 | object.__setattr__(mock_root_event_context, "state_manager", orig_state_manager) |
| 261 | |
| 262 | |
| 263 | @pytest.fixture |
nothing calls this directly
no test coverage detected