MCPcopy
hub / github.com/reflex-dev/reflex / BackgroundTask

Function BackgroundTask

tests/integration/test_background_task.py:11–219  ·  view source on GitHub ↗

Test that background tasks work as expected.

()

Source from the content-addressed store, hash-verified

9
10
11def BackgroundTask():
12 """Test that background tasks work as expected."""
13 import asyncio
14
15 import pytest
16
17 import reflex as rx
18 from reflex.state import ImmutableStateError
19
20 class State(rx.State):
21 counter: int = 0
22 _task_id: int = 0
23 iterations: rx.Field[int] = rx.field(10)
24
25 @rx.event
26 def set_iterations(self, value: str):
27 self.iterations = int(value)
28
29 @rx.var
30 async def counter_async_cv(self) -> int:
31 """This exists solely as an integration test for background tasks triggering async var updates.
32
33 Returns:
34 The current value of the counter.
35 """
36 return self.counter
37
38 @rx.event(background=True)
39 async def handle_event(self):
40 async with self:
41 self._task_id += 1
42 for _ix in range(int(self.iterations)):
43 async with self:
44 self.counter += 1
45 await asyncio.sleep(0.005)
46
47 @rx.event(background=True)
48 async def handle_event_yield_only(self):
49 async with self:
50 self._task_id += 1
51 for ix in range(int(self.iterations)):
52 if ix % 2 == 0:
53 yield State.increment_arbitrary(1)
54 else:
55 yield State.increment()
56 await asyncio.sleep(0.005)
57
58 @rx.event(background=True)
59 async def fast_yielding(self):
60 for _ in range(1000):
61 yield State.increment()
62
63 @rx.event
64 def increment(self):
65 self.counter += 1
66
67 @rx.event(background=True)
68 async def increment_arbitrary(self, amount: int):

Callers

nothing calls this directly

Calls 1

add_pageMethod · 0.95

Tested by

no test coverage detected