Get or create the global async session with optimized connection pooling
(cls)
| 69 | |
| 70 | @classmethod |
| 71 | async def get_async_session(cls) -> Optional[aiohttp.ClientSession]: |
| 72 | """Get or create the global async session with optimized connection pooling""" |
| 73 | if not AIOHTTP_AVAILABLE: |
| 74 | logger.warning("aiohttp not available, cannot create async session") |
| 75 | return None |
| 76 | |
| 77 | # Always create a new session if the current one is None or closed |
| 78 | if cls._async_session is None or cls._async_session.closed: |
| 79 | # Close the old session if it exists but is closed |
| 80 | if cls._async_session is not None and cls._async_session.closed: |
| 81 | cls._async_session = None |
| 82 | |
| 83 | # Create connector with connection pooling |
| 84 | connector = aiohttp.TCPConnector( |
| 85 | limit=100, # Total connection pool size |
| 86 | limit_per_host=30, # Per-host connection limit |
| 87 | ttl_dns_cache=300, # DNS cache TTL |
| 88 | use_dns_cache=True, |
| 89 | enable_cleanup_closed=True, |
| 90 | ) |
| 91 | |
| 92 | # Create session with default headers |
| 93 | headers = { |
| 94 | "Content-Type": "application/json", |
| 95 | "User-Agent": f"agentops-python/{get_agentops_version() or 'unknown'}", |
| 96 | } |
| 97 | |
| 98 | cls._async_session = aiohttp.ClientSession( |
| 99 | connector=connector, headers=headers, timeout=aiohttp.ClientTimeout(total=30) |
| 100 | ) |
| 101 | |
| 102 | return cls._async_session |
| 103 | |
| 104 | @classmethod |
| 105 | async def close_async_session(cls): |
no test coverage detected