A state manager that stores states in memory.
| 19 | |
| 20 | @dataclasses.dataclass |
| 21 | class StateManagerMemory(StateManager): |
| 22 | """A state manager that stores states in memory.""" |
| 23 | |
| 24 | # The token expiration time (s). |
| 25 | token_expiration: int = dataclasses.field(default_factory=_default_token_expiration) |
| 26 | |
| 27 | # The mapping of client ids to states. |
| 28 | states: dict[str, Any] = dataclasses.field(default_factory=dict) |
| 29 | |
| 30 | # The mutex ensures the dict of mutexes is updated exclusively |
| 31 | _state_manager_lock: asyncio.Lock = dataclasses.field(default=asyncio.Lock()) |
| 32 | |
| 33 | # The dict of mutexes for each client |
| 34 | _states_locks: dict[str, asyncio.Lock] = dataclasses.field( |
| 35 | default_factory=dict, |
| 36 | init=False, |
| 37 | ) |
| 38 | |
| 39 | # The latest expiration deadline and token for each cache key. |
| 40 | _token_expires_at: dict[str, tuple[float, StateToken]] = dataclasses.field( |
| 41 | default_factory=dict, |
| 42 | init=False, |
| 43 | ) |
| 44 | |
| 45 | _expiration_task: asyncio.Task | None = dataclasses.field(default=None, init=False) |
| 46 | |
| 47 | def _get_or_create_state(self, token: StateToken[TOKEN_TYPE]) -> TOKEN_TYPE: |
| 48 | """Get an existing state or create a fresh one for a token. |
| 49 | |
| 50 | Args: |
| 51 | token: The normalized client token. |
| 52 | |
| 53 | Returns: |
| 54 | The state for the token. |
| 55 | """ |
| 56 | key = token.cache_key |
| 57 | if key not in self.states: |
| 58 | if isinstance(token, BaseStateToken): |
| 59 | self.states[key] = token.cls.get_root_state()( |
| 60 | _reflex_internal_init=True |
| 61 | ) |
| 62 | else: |
| 63 | self.states[key] = token.cls() |
| 64 | return cast(TOKEN_TYPE, self.states[key]) |
| 65 | |
| 66 | def _track_token(self, token: StateToken): |
| 67 | """Refresh the expiration deadline for an active token.""" |
| 68 | self._token_expires_at[token.cache_key] = ( |
| 69 | time.time() + self.token_expiration, |
| 70 | token, |
| 71 | ) |
| 72 | self._ensure_expiration_task() |
| 73 | |
| 74 | def _purge_token(self, token: StateToken): |
| 75 | """Remove a token from in-memory state bookkeeping. |
| 76 | |
| 77 | Args: |
| 78 | token: The token to purge. |
no outgoing calls