(self, dork, pages=2, pagesize=20, search_type="v4")
| 76 | logger.error(str(ex)) |
| 77 | |
| 78 | def search(self, dork, pages=2, pagesize=20, search_type="v4"): |
| 79 | search_result = set() |
| 80 | if kb.comparison: |
| 81 | kb.comparison.add_dork("Zoomeye", dork) |
| 82 | try: |
| 83 | for page in range(1, pages + 1): |
| 84 | time.sleep(1) |
| 85 | url = f'{self.url}/v2/search' |
| 86 | data = { |
| 87 | "qbase64": b64encode(dork.encode('utf-8')).decode('utf-8'), |
| 88 | "page": page, |
| 89 | "pagesize": pagesize, |
| 90 | "sub_type": search_type, |
| 91 | "fields": "ip,port,domain,service,honeypot" |
| 92 | } |
| 93 | |
| 94 | resp = requests.post(url, headers=self.headers, timeout=60, json=data) |
| 95 | content = resp.json() |
| 96 | if resp and resp.status_code == 200 and content.get("code", None) == 60000: |
| 97 | |
| 98 | for match in content['data']: |
| 99 | if match['domain']: |
| 100 | url = match['domain'] |
| 101 | else: |
| 102 | host = match['ip'] |
| 103 | port = match['port'] |
| 104 | url = f'[{host}]:{port}' if is_ipv6_address_format(host) else f'{host}:{port}' |
| 105 | scheme = '' |
| 106 | if match['service']: |
| 107 | scheme = str(match['service'].split('/')[-1]) |
| 108 | if scheme: |
| 109 | url = f'{scheme}://{url}' |
| 110 | |
| 111 | search_result.add(url) |
| 112 | if kb.comparison: |
| 113 | honeypot = False |
| 114 | if match['honeypot'] == 1: |
| 115 | honeypot = True |
| 116 | kb.comparison.add_ip(match['ip'], "Zoomeye", honeypot) |
| 117 | except Exception as ex: |
| 118 | logger.error(str(ex)) |
| 119 | return search_result |
| 120 | |
| 121 | |
| 122 | if __name__ == "__main__": |
no test coverage detected