MCPcopy
hub / github.com/reflex-dev/reflex / LinkedStateApp

Function LinkedStateApp

tests/integration/test_linked_state.py:20–269  ·  view source on GitHub ↗

Test that linked state works as expected.

()

Source from the content-addressed store, hash-verified

18
19
20def LinkedStateApp():
21 """Test that linked state works as expected."""
22 import uuid
23 from typing import Any
24
25 import reflex as rx
26
27 class SharedState(rx.SharedState):
28 _who: str = "world"
29 n_changes: int = 0
30 counter: int = 0
31 event_seen_linked_to: str = ""
32
33 @rx.event
34 def set_counter(self, value: int) -> None:
35 self.counter = value
36
37 @rx.event
38 def set_who(self, who: str) -> None:
39 self._who = who
40 self.n_changes += 1
41
42 @rx.event
43 async def link_to(self, token: str):
44 await self._link_to(token)
45
46 @rx.event
47 async def link_to_and_increment(self):
48 linked_state = await self._link_to(f"arbitrary-token-{uuid.uuid4()}")
49 linked_state.counter += 1
50
51 @rx.event
52 async def unlink(self):
53 return await self._unlink()
54
55 @rx.event
56 def record_event_link_status(self):
57 self.event_seen_linked_to = self._linked_to
58
59 @rx.event
60 def clear_event_link_status(self):
61 self.event_seen_linked_to = ""
62
63 @rx.event
64 async def on_load_link_default(self):
65 linked_state = await self._link_to(self.room or "default") # pyright: ignore[reportAttributeAccessIssue]
66 if self.room: # pyright: ignore[reportAttributeAccessIssue]
67 assert linked_state._linked_to == self.room # pyright: ignore[reportAttributeAccessIssue]
68 else:
69 assert linked_state._linked_to == "default"
70
71 @rx.event
72 async def handle_submit(self, form_data: dict[str, Any]):
73 if "who" in form_data:
74 self.set_who(form_data["who"])
75 if "token" in form_data:
76 await self.link_to(form_data["token"])
77

Callers

nothing calls this directly

Calls 1

add_pageMethod · 0.95

Tested by

no test coverage detected