(self, dork, pages=2)
| 61 | logger.error(str(ex)) |
| 62 | |
| 63 | def search(self, dork, pages=2): |
| 64 | search_result = set() |
| 65 | data = {"query": dork, "size": 10, |
| 66 | "ignore_cache": "false", "start": 1} |
| 67 | try: |
| 68 | for page in range(1, pages + 1): |
| 69 | time.sleep(1) |
| 70 | data['start'] = page |
| 71 | url = "https://quake.360.cn/api/v3/search/quake_service" |
| 72 | resp = requests.post(url, json=data, headers=self.headers, timeout=60) |
| 73 | if resp and resp.status_code == 200 and resp.json()['code'] == 0: |
| 74 | content = resp.json() |
| 75 | for match in content['data']: |
| 76 | ip = match['ip'] |
| 77 | if is_ipv6_address_format(ip): |
| 78 | ip = f'[{ip}]' |
| 79 | search_result.add("%s:%s" % (ip, match['port'])) |
| 80 | else: |
| 81 | logger.error("[PLUGIN] Quake:{}".format(resp.text)) |
| 82 | except Exception as ex: |
| 83 | logger.error(str(ex)) |
| 84 | return search_result |
| 85 | |
| 86 | |
| 87 | if __name__ == "__main__": |
no test coverage detected