(self, as_client: httpx.AsyncClient, verb: str, endpoint_name: str, base_url: str, params: Dict[str, Any], data: Any, data_type: str)
| 100 | return token |
| 101 | |
| 102 | async def _query(self, as_client: httpx.AsyncClient, verb: str, endpoint_name: str, base_url: str, params: Dict[str, Any], data: Any, data_type: str) -> httpx.Response: |
| 103 | endpoint = self.loaded_endpoints[endpoint_name] |
| 104 | headers = endpoint.headers |
| 105 | if self.authentication_mode == "oauth": |
| 106 | token = await self._check_and_gen_authorization_token(as_client, self.creds) |
| 107 | headers = {**headers, "Authorization": f"OAuth {token}"} |
| 108 | |
| 109 | if verb == "GET": |
| 110 | req = await as_client.get(f"{self.scheme}://{self.hostname}{base_url}", |
| 111 | params=params, headers=headers, cookies=endpoint.cookies) |
| 112 | elif verb == "POST": |
| 113 | if data_type == "data": |
| 114 | req = await as_client.post(f"{self.scheme}://{self.hostname}{base_url}", |
| 115 | params=params, data=data, headers=headers, cookies=endpoint.cookies) |
| 116 | elif data_type == "json": |
| 117 | req = await as_client.post(f"{self.scheme}://{self.hostname}{base_url}", |
| 118 | params=params, json=data, headers=headers, cookies=endpoint.cookies) |
| 119 | else: |
| 120 | raise GHuntUnknownRequestDataTypeError(f"The provided data type {data_type} wasn't recognized by GHunt.") |
| 121 | else: |
| 122 | raise GHuntUnknownVerbError(f"The provided verb {verb} wasn't recognized by GHunt.") |
| 123 | |
| 124 | return req |
| 125 | |
| 126 | # Others |
| 127 |
no test coverage detected