(subject: str, expires_delta: timedelta)
| 29 | |
| 30 | |
| 31 | def _create_local_access_token(subject: str, expires_delta: timedelta) -> str: |
| 32 | from skyvern.config import settings # noqa: PLC0415 |
| 33 | |
| 34 | if settings.SIGNATURE_ALGORITHM != "HS256": |
| 35 | raise RuntimeError("Local quickstart token bootstrap only supports HS256 SIGNATURE_ALGORITHM") |
| 36 | |
| 37 | expire = datetime.utcnow() + expires_delta |
| 38 | header = {"alg": "HS256", "typ": "JWT"} |
| 39 | payload = {"exp": int(expire.timestamp()), "sub": str(subject)} |
| 40 | signing_input = ".".join( |
| 41 | [ |
| 42 | _base64url_encode(json.dumps(header, separators=(",", ":")).encode("utf-8")), |
| 43 | _base64url_encode(json.dumps(payload, separators=(",", ":")).encode("utf-8")), |
| 44 | ] |
| 45 | ) |
| 46 | signature = hmac.new(settings.SECRET_KEY.encode("utf-8"), signing_input.encode("ascii"), hashlib.sha256).digest() |
| 47 | return f"{signing_input}.{_base64url_encode(signature)}" |
| 48 | |
| 49 | |
| 50 | async def get_or_create_local_organization(database: Any | None = None) -> Organization: |
no test coverage detected