MCPcopy Index your code
hub / github.com/AgentOps-AI/agentops / get_async_session

Method get_async_session

agentops/client/http/http_client.py:71–102  ·  view source on GitHub ↗

Get or create the global async session with optimized connection pooling

(cls)

Source from the content-addressed store, hash-verified

69
70 @classmethod
71 async def get_async_session(cls) -> Optional[aiohttp.ClientSession]:
72 """Get or create the global async session with optimized connection pooling"""
73 if not AIOHTTP_AVAILABLE:
74 logger.warning("aiohttp not available, cannot create async session")
75 return None
76
77 # Always create a new session if the current one is None or closed
78 if cls._async_session is None or cls._async_session.closed:
79 # Close the old session if it exists but is closed
80 if cls._async_session is not None and cls._async_session.closed:
81 cls._async_session = None
82
83 # Create connector with connection pooling
84 connector = aiohttp.TCPConnector(
85 limit=100, # Total connection pool size
86 limit_per_host=30, # Per-host connection limit
87 ttl_dns_cache=300, # DNS cache TTL
88 use_dns_cache=True,
89 enable_cleanup_closed=True,
90 )
91
92 # Create session with default headers
93 headers = {
94 "Content-Type": "application/json",
95 "User-Agent": f"agentops-python/{get_agentops_version() or 'unknown'}",
96 }
97
98 cls._async_session = aiohttp.ClientSession(
99 connector=connector, headers=headers, timeout=aiohttp.ClientTimeout(total=30)
100 )
101
102 return cls._async_session
103
104 @classmethod
105 async def close_async_session(cls):

Callers 1

async_requestMethod · 0.80

Calls 1

get_agentops_versionFunction · 0.90

Tested by

no test coverage detected