| 54 | |
| 55 | |
| 56 | class Fork: |
| 57 | class Response: |
| 58 | def __init__(self, content: str, child_link_list: List[ChildLink], status, message: str): |
| 59 | self.content = content |
| 60 | self.child_link_list = child_link_list |
| 61 | self.status = status |
| 62 | self.message = message |
| 63 | |
| 64 | @staticmethod |
| 65 | def success(html_content: str, child_link_list: List[ChildLink]): |
| 66 | return Fork.Response(html_content, child_link_list, 200, '') |
| 67 | |
| 68 | @staticmethod |
| 69 | def error(message: str): |
| 70 | return Fork.Response('', [], 500, message) |
| 71 | |
| 72 | def __init__(self, base_fork_url: str, selector_list: List[str]): |
| 73 | base_fork_url = remove_fragment(base_fork_url) |
| 74 | parsed = urlparse(base_fork_url) |
| 75 | path = parsed.path.rstrip('/') |
| 76 | self.base_fork_url = urlunparse(( |
| 77 | parsed.scheme, |
| 78 | parsed.netloc, |
| 79 | path, |
| 80 | None, |
| 81 | None, |
| 82 | None # fragment |
| 83 | )) |
| 84 | parsed = urlsplit(base_fork_url) |
| 85 | query = parsed.query |
| 86 | if query is not None and len(query) > 0: |
| 87 | self.base_fork_url = self.base_fork_url + '?' + query |
| 88 | self.selector_list = [selector for selector in selector_list if selector is not None and len(selector) > 0] |
| 89 | self.urlparse = urlparse(self.base_fork_url) |
| 90 | self.base_url = ParseResult(scheme=self.urlparse.scheme, netloc=self.urlparse.netloc, path='', params='', |
| 91 | query='', |
| 92 | fragment='').geturl() |
| 93 | |
| 94 | def get_child_link_list(self, bf: BeautifulSoup): |
| 95 | # Compute the crawl prefix: parent directory when base_fork_url is an HTML file |
| 96 | crawl_prefix = self.base_fork_url |
| 97 | if crawl_prefix.endswith(('.html', '.htm')): |
| 98 | crawl_prefix = crawl_prefix.rsplit('/', 1)[0] |
| 99 | pattern = "^((?!(http:|https:|tel:/|#|mailto:|javascript:))|" + crawl_prefix + "|/).*" |
| 100 | link_list = bf.find_all(name='a', href=re.compile(pattern)) |
| 101 | result = [ChildLink(link.get('href'), link) if link.get('href').startswith(self.base_url) else ChildLink( |
| 102 | self.base_url + link.get('href'), link) for link in link_list] |
| 103 | result = [row for row in result if row.url.startswith(crawl_prefix)] |
| 104 | return result |
| 105 | |
| 106 | def get_content_html(self, bf: BeautifulSoup): |
| 107 | if self.selector_list is None or len(self.selector_list) == 0: |
| 108 | return str(bf) |
| 109 | params = reduce(lambda x, y: {**x, **y}, |
| 110 | [{'class_': selector.replace('.', '')} if selector.startswith('.') else |
| 111 | {'id': selector.replace("#", "")} if selector.startswith("#") else {'name': selector} for |
| 112 | selector in |
| 113 | self.selector_list], {}) |
no outgoing calls
no test coverage detected