A state with a background task.
| 2416 | |
| 2417 | |
| 2418 | class BackgroundTaskState(BaseState): |
| 2419 | """A state with a background task.""" |
| 2420 | |
| 2421 | order: list[str] = [] |
| 2422 | dict_list: dict[str, list[int]] = {"foo": [1, 2, 3]} |
| 2423 | dc: ModelDC = ModelDC() |
| 2424 | |
| 2425 | @rx.var(cache=False) |
| 2426 | def computed_order(self) -> list[str]: |
| 2427 | """Get the order as a computed var. |
| 2428 | |
| 2429 | Returns: |
| 2430 | The value of 'order' var. |
| 2431 | """ |
| 2432 | return self.order |
| 2433 | |
| 2434 | @rx.event(background=True) |
| 2435 | async def background_task(self): |
| 2436 | """A background task that updates the state.""" |
| 2437 | async with self: |
| 2438 | assert not self.order |
| 2439 | self.order.append("background_task:start") |
| 2440 | |
| 2441 | assert isinstance(self, StateProxy) |
| 2442 | with pytest.raises(ImmutableStateError): |
| 2443 | self.order.append("bad idea") |
| 2444 | |
| 2445 | with pytest.raises(ImmutableStateError): |
| 2446 | # Cannot manipulate dataclass attributes. |
| 2447 | self.dc.foo = "baz" |
| 2448 | |
| 2449 | with pytest.raises(ImmutableStateError): |
| 2450 | # Even nested access to mutables raises an exception. |
| 2451 | self.dict_list["foo"].append(42) |
| 2452 | |
| 2453 | with pytest.raises(ImmutableStateError): |
| 2454 | # Cannot modify dataclass list attribute. |
| 2455 | self.dc.ls.append({"foo": "bar"}) |
| 2456 | |
| 2457 | with pytest.raises(ImmutableStateError): |
| 2458 | # Direct calling another handler that modifies state raises an exception. |
| 2459 | self.other() |
| 2460 | |
| 2461 | with pytest.raises(ImmutableStateError): |
| 2462 | # Calling other methods that modify state raises an exception. |
| 2463 | self._private_method() |
| 2464 | |
| 2465 | # wait for some other event to happen |
| 2466 | while len(self.order) == 1: |
| 2467 | await asyncio.sleep(0.01) |
| 2468 | async with self: |
| 2469 | pass # update proxy instance |
| 2470 | |
| 2471 | async with self: |
| 2472 | # Methods on ImmutableMutableProxy should return their wrapped return value. |
| 2473 | assert self.dict_list.pop("foo") == [1, 2, 3] |
| 2474 | |
| 2475 | self.order.append("background_task:stop") |