(self, provider_settings: dict)
| 66 | lock: asyncio.Lock = field(default_factory=asyncio.Lock) |
| 67 | |
| 68 | async def get(self, provider_settings: dict) -> str: |
| 69 | keys = provider_settings.get(self.setting_name, []) |
| 70 | if not keys: |
| 71 | raise ValueError( |
| 72 | f"Error: {self.provider_name} API key is not configured in AstrBot." |
| 73 | ) |
| 74 | |
| 75 | async with self.lock: |
| 76 | if self.index >= len(keys): |
| 77 | self.index = 0 |
| 78 | key = keys[self.index] |
| 79 | self.index = (self.index + 1) % len(keys) |
| 80 | return key |
| 81 | |
| 82 | |
| 83 | _TAVILY_KEY_ROTATOR = _KeyRotator("websearch_tavily_key", "Tavily") |
no outgoing calls
no test coverage detected