MCPcopy Index your code
hub / github.com/AgentOps-AI/agentops / async_request

Method async_request

agentops/client/api/base.py:74–106  ·  view source on GitHub ↗

Make a generic async HTTP request Args: method: HTTP method (e.g., 'get', 'post', 'put', 'delete') path: API endpoint path data: Request payload (for POST, PUT methods) headers: Request headers timeout: Request timeout in

(
        self,
        method: str,
        path: str,
        data: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None,
        timeout: int = 30,
    )

Source from the content-addressed store, hash-verified

72 return f"{self.endpoint}{path}"
73
74 async def async_request(
75 self,
76 method: str,
77 path: str,
78 data: Optional[Dict[str, Any]] = None,
79 headers: Optional[Dict[str, str]] = None,
80 timeout: int = 30,
81 ) -> Optional[Dict[str, Any]]:
82 """
83 Make a generic async HTTP request
84
85 Args:
86 method: HTTP method (e.g., 'get', 'post', 'put', 'delete')
87 path: API endpoint path
88 data: Request payload (for POST, PUT methods)
89 headers: Request headers
90 timeout: Request timeout in seconds
91
92 Returns:
93 JSON response as dictionary, or None if request failed
94
95 Raises:
96 Exception: If the request fails
97 """
98 url = self._get_full_url(path)
99
100 try:
101 response_data = await self.http_client.async_request(
102 method=method, url=url, data=data, headers=headers, timeout=timeout
103 )
104 return response_data
105 except Exception as e:
106 raise Exception(f"{method.upper()} request failed: {str(e)}") from e
107
108 async def post(self, path: str, data: Dict[str, Any], headers: Dict[str, str]) -> Optional[Dict[str, Any]]:
109 """

Callers 5

postMethod · 0.95
getMethod · 0.95
putMethod · 0.95
deleteMethod · 0.95
fetch_auth_tokenMethod · 0.45

Calls 1

_get_full_urlMethod · 0.95

Tested by

no test coverage detected