Set the state for a token. Args: token: The token to set the state for. state: The state to set. lock_id: If provided, the lock must be held with this value to set the state. context: The event context. Raises: LockExpired
(
self,
token: StateToken[TOKEN_TYPE],
state: TOKEN_TYPE,
*,
lock_id: bytes | None = None,
**context: Unpack[StateModificationContext],
)
| 351 | |
| 352 | @override |
| 353 | async def set_state( |
| 354 | self, |
| 355 | token: StateToken[TOKEN_TYPE], |
| 356 | state: TOKEN_TYPE, |
| 357 | *, |
| 358 | lock_id: bytes | None = None, |
| 359 | **context: Unpack[StateModificationContext], |
| 360 | ): |
| 361 | """Set the state for a token. |
| 362 | |
| 363 | Args: |
| 364 | token: The token to set the state for. |
| 365 | state: The state to set. |
| 366 | lock_id: If provided, the lock must be held with this value to set the state. |
| 367 | context: The event context. |
| 368 | |
| 369 | Raises: |
| 370 | LockExpiredError: If lock_id is provided and the lock for the token is not held by that ID. |
| 371 | RuntimeError: If the state instance doesn't match the state name in the token. |
| 372 | """ |
| 373 | token = self._coerce_token(token) |
| 374 | # Check that we're holding the lock. |
| 375 | if ( |
| 376 | lock_id is not None |
| 377 | and (existing_lock_id := await self.redis.get(self._lock_key(token))) |
| 378 | != lock_id |
| 379 | ): |
| 380 | msg = ( |
| 381 | f"Lock expired for token {token} while processing. Consider increasing " |
| 382 | f"`app.state_manager.lock_expiration` (currently {self.lock_expiration}) " |
| 383 | "or use `@rx.event(background=True)` decorator for long-running tasks. " |
| 384 | f"Current lock id: {existing_lock_id!r}, expected lock id: {lock_id!r}." |
| 385 | + ( |
| 386 | f" Happened in event: {event.name}" |
| 387 | if (event := context.get("event")) is not None |
| 388 | else "" |
| 389 | ) |
| 390 | ) |
| 391 | raise LockExpiredError(msg) |
| 392 | |
| 393 | if not isinstance(token, BaseStateToken): |
| 394 | # Non-BaseState token: simple single-key write. |
| 395 | pickle_state = token.serialize(state) |
| 396 | if pickle_state: |
| 397 | await self.redis.set(str(token), pickle_state, ex=self.token_expiration) |
| 398 | return |
| 399 | |
| 400 | base_state = cast(BaseState, state) |
| 401 | |
| 402 | lock_key = token.lock_key |
| 403 | |
| 404 | if lock_id is not None and lock_key not in self._local_leases: |
| 405 | time_taken = ( |
| 406 | self.lock_expiration - (await self.redis.pttl(self._lock_key(token))) |
| 407 | ) / 1000 |
| 408 | if time_taken > self.lock_warning_threshold / 1000: |
| 409 | console.warn( |
| 410 | f"Lock for token {token} was held too long {time_taken=}s, " |
no test coverage detected