Test webhook configuration and connectivity. Args: url: Webhook endpoint URL to test payload: Test payload data authorization_type: Type of authorization (none, bearer, basic, custom) authorization_key: Authorization key/token auth
(
self,
url: str,
payload: dict[str, Any],
authorization_type: str | AuthorizationType = AuthorizationType.NONE,
authorization_key: str | None = None,
authorization_header: str | None = None,
timeout: int = 10,
organization_id: str | None = None,
)
| 156 | raise |
| 157 | |
| 158 | def test_webhook( |
| 159 | self, |
| 160 | url: str, |
| 161 | payload: dict[str, Any], |
| 162 | authorization_type: str | AuthorizationType = AuthorizationType.NONE, |
| 163 | authorization_key: str | None = None, |
| 164 | authorization_header: str | None = None, |
| 165 | timeout: int = 10, |
| 166 | organization_id: str | None = None, |
| 167 | ) -> APIResponse: |
| 168 | """Test webhook configuration and connectivity. |
| 169 | |
| 170 | Args: |
| 171 | url: Webhook endpoint URL to test |
| 172 | payload: Test payload data |
| 173 | authorization_type: Type of authorization (none, bearer, basic, custom) |
| 174 | authorization_key: Authorization key/token |
| 175 | authorization_header: Custom authorization header |
| 176 | timeout: Request timeout in seconds |
| 177 | organization_id: Optional organization ID override |
| 178 | |
| 179 | Returns: |
| 180 | Test result with success status and response details |
| 181 | """ |
| 182 | # Convert authorization_type to string if it's an enum |
| 183 | auth_type_str = ( |
| 184 | authorization_type.value |
| 185 | if isinstance(authorization_type, AuthorizationType) |
| 186 | else authorization_type |
| 187 | ) |
| 188 | |
| 189 | data = { |
| 190 | "url": url, |
| 191 | "payload": payload, |
| 192 | "authorization_type": auth_type_str, |
| 193 | "timeout": timeout, |
| 194 | } |
| 195 | |
| 196 | # Add optional authorization |
| 197 | if authorization_key: |
| 198 | data["authorization_key"] = authorization_key |
| 199 | if authorization_header: |
| 200 | data["authorization_header"] = authorization_header |
| 201 | |
| 202 | logger.info(f"Testing webhook configuration for {url}") |
| 203 | logger.debug(f"Test webhook authorization_type: {auth_type_str}") |
| 204 | |
| 205 | try: |
| 206 | response = self.post( |
| 207 | self._build_url("webhook", "test/"), data, organization_id=organization_id |
| 208 | ) |
| 209 | |
| 210 | success = response.get("success", False) |
| 211 | logger.info(f"Webhook test for {url} {'succeeded' if success else 'failed'}") |
| 212 | return APIResponse( |
| 213 | success=success, data=response, status_code=response.get("status_code") |
| 214 | ) |
| 215 |
nothing calls this directly
no test coverage detected