MCPcopy Index your code
hub / github.com/reflex-dev/reflex / test_background_task_reset

Function test_background_task_reset

tests/units/test_state.py:2578–2608  ·  view source on GitHub ↗

Test that a background task calling reset is protected by the state proxy. Args: mock_app: An app that will be returned by `get_app()` token: A token. mock_base_state_event_processor: The event processor. state_manager: A state manager instance.

(
    mock_app: rx.App,
    token: str,
    mock_base_state_event_processor: BaseStateEventProcessor,
    state_manager: StateManager,
)

Source from the content-addressed store, hash-verified

2576
2577@pytest.mark.asyncio
2578async def test_background_task_reset(
2579 mock_app: rx.App,
2580 token: str,
2581 mock_base_state_event_processor: BaseStateEventProcessor,
2582 state_manager: StateManager,
2583):
2584 """Test that a background task calling reset is protected by the state proxy.
2585
2586 Args:
2587 mock_app: An app that will be returned by `get_app()`
2588 token: A token.
2589 mock_base_state_event_processor: The event processor.
2590 state_manager: A state manager instance.
2591 """
2592 async with mock_base_state_event_processor as processor:
2593 await processor.enqueue(
2594 token,
2595 Event(
2596 name=f"{BackgroundTaskState.get_full_name()}.background_task_reset",
2597 payload={},
2598 ),
2599 )
2600
2601 if environment.REFLEX_OPLOCK_ENABLED.get():
2602 await state_manager.close()
2603
2604 background_task_state = await state_manager.get_state(
2605 BaseStateToken(ident=token, cls=BackgroundTaskState)
2606 )
2607 assert isinstance(background_task_state, BackgroundTaskState)
2608 assert background_task_state.order == ["reset"]
2609
2610
2611@pytest.mark.asyncio

Callers

nothing calls this directly

Calls 7

EventClass · 0.90
BaseStateTokenClass · 0.90
get_full_nameMethod · 0.80
enqueueMethod · 0.45
getMethod · 0.45
closeMethod · 0.45
get_stateMethod · 0.45

Tested by

no test coverage detected