Test that a background task calling reset is protected by the state proxy. Args: mock_app: An app that will be returned by `get_app()` token: A token. mock_base_state_event_processor: The event processor. state_manager: A state manager instance.
(
mock_app: rx.App,
token: str,
mock_base_state_event_processor: BaseStateEventProcessor,
state_manager: StateManager,
)
| 2576 | |
| 2577 | @pytest.mark.asyncio |
| 2578 | async def test_background_task_reset( |
| 2579 | mock_app: rx.App, |
| 2580 | token: str, |
| 2581 | mock_base_state_event_processor: BaseStateEventProcessor, |
| 2582 | state_manager: StateManager, |
| 2583 | ): |
| 2584 | """Test that a background task calling reset is protected by the state proxy. |
| 2585 | |
| 2586 | Args: |
| 2587 | mock_app: An app that will be returned by `get_app()` |
| 2588 | token: A token. |
| 2589 | mock_base_state_event_processor: The event processor. |
| 2590 | state_manager: A state manager instance. |
| 2591 | """ |
| 2592 | async with mock_base_state_event_processor as processor: |
| 2593 | await processor.enqueue( |
| 2594 | token, |
| 2595 | Event( |
| 2596 | name=f"{BackgroundTaskState.get_full_name()}.background_task_reset", |
| 2597 | payload={}, |
| 2598 | ), |
| 2599 | ) |
| 2600 | |
| 2601 | if environment.REFLEX_OPLOCK_ENABLED.get(): |
| 2602 | await state_manager.close() |
| 2603 | |
| 2604 | background_task_state = await state_manager.get_state( |
| 2605 | BaseStateToken(ident=token, cls=BackgroundTaskState) |
| 2606 | ) |
| 2607 | assert isinstance(background_task_state, BackgroundTaskState) |
| 2608 | assert background_task_state.order == ["reset"] |
| 2609 | |
| 2610 | |
| 2611 | @pytest.mark.asyncio |
nothing calls this directly
no test coverage detected