(self)
| 291 | return json.loads(self.rfile.read(length)) if length else {} |
| 292 | |
| 293 | def do_POST(self): |
| 294 | if self.path == "/mitigate": |
| 295 | try: |
| 296 | params = self._read_body() |
| 297 | uuid = add_flowspec_rule(self.stub, params) |
| 298 | print("[+] added rule %s: %s" % (uuid[:12], params)) |
| 299 | self._send_json(200, {"status": "ok", "uuid": uuid}) |
| 300 | except Exception as e: |
| 301 | print("[-] error adding rule: %s" % e) |
| 302 | self._send_json(400, {"status": "error", "message": str(e)}) |
| 303 | |
| 304 | elif self.path == "/clear": |
| 305 | try: |
| 306 | params = self._read_body() |
| 307 | uuid = params.get("uuid", "") |
| 308 | delete_flowspec_rule(self.stub, uuid) |
| 309 | print("[+] removed rule %s" % uuid[:12]) |
| 310 | self._send_json(200, {"status": "ok"}) |
| 311 | except Exception as e: |
| 312 | print("[-] error removing rule: %s" % e) |
| 313 | self._send_json(400, {"status": "error", "message": str(e)}) |
| 314 | |
| 315 | else: |
| 316 | self._send_json(404, {"status": "not found"}) |
| 317 | |
| 318 | def do_GET(self): |
| 319 | if self.path == "/rules": |
nothing calls this directly
no test coverage detected