A state manager that stores states on disk.
| 35 | |
| 36 | @dataclasses.dataclass |
| 37 | class StateManagerDisk(StateManager): |
| 38 | """A state manager that stores states on disk.""" |
| 39 | |
| 40 | # The mapping of client ids to states. |
| 41 | states: dict[str, Any] = dataclasses.field(default_factory=dict) |
| 42 | |
| 43 | # The mutex ensures the dict of mutexes is updated exclusively |
| 44 | _state_manager_lock: asyncio.Lock = dataclasses.field(default=asyncio.Lock()) |
| 45 | |
| 46 | # The dict of mutexes for each client |
| 47 | _states_locks: dict[str, asyncio.Lock] = dataclasses.field( |
| 48 | default_factory=dict, |
| 49 | init=False, |
| 50 | ) |
| 51 | |
| 52 | # The token expiration time (s). |
| 53 | token_expiration: int = dataclasses.field(default_factory=_default_token_expiration) |
| 54 | |
| 55 | # Last time a token was touched. |
| 56 | _token_last_touched: dict[str, float] = dataclasses.field( |
| 57 | default_factory=dict, |
| 58 | init=False, |
| 59 | ) |
| 60 | |
| 61 | # Pending writes |
| 62 | _write_queue: dict[StateToken, QueueItem] = dataclasses.field( |
| 63 | default_factory=dict, |
| 64 | init=False, |
| 65 | ) |
| 66 | _write_queue_task: asyncio.Task | None = None |
| 67 | _write_debounce_seconds: float = dataclasses.field( |
| 68 | default=environment.REFLEX_STATE_MANAGER_DISK_DEBOUNCE_SECONDS.get() |
| 69 | ) |
| 70 | |
| 71 | def __post_init__(self): |
| 72 | """Create a new state manager.""" |
| 73 | path_ops.mkdir(self.states_directory) |
| 74 | |
| 75 | self._purge_expired_states() |
| 76 | |
| 77 | @functools.cached_property |
| 78 | def states_directory(self) -> Path: |
| 79 | """Get the states directory. |
| 80 | |
| 81 | Returns: |
| 82 | The states directory. |
| 83 | """ |
| 84 | return prerequisites.get_states_dir() |
| 85 | |
| 86 | def _purge_expired_states(self): |
| 87 | """Purge expired states from the disk.""" |
| 88 | for path in path_ops.ls(self.states_directory): |
| 89 | # check path is a pickle file |
| 90 | if path.suffix != ".pkl": |
| 91 | continue |
| 92 | |
| 93 | # load last edited field from file |
| 94 | last_edited = path.stat().st_mtime |