| 30 | pass |
| 31 | |
| 32 | class WebScrappingStrategy(ContentScrappingStrategy): |
| 33 | def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: |
| 34 | return self._get_content_of_website_optimized(url, html, is_async=False, **kwargs) |
| 35 | |
| 36 | async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: |
| 37 | return await asyncio.to_thread(self._get_content_of_website_optimized, url, html, **kwargs) |
| 38 | |
| 39 | def _get_content_of_website_optimized(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]: |
| 40 | success = True |
| 41 | if not html: |
| 42 | return None |
| 43 | |
| 44 | soup = BeautifulSoup(html, 'html.parser') |
| 45 | body = soup.body |
| 46 | |
| 47 | |
| 48 | image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) |
| 49 | |
| 50 | for tag in kwargs.get('excluded_tags', []) or []: |
| 51 | for el in body.select(tag): |
| 52 | el.decompose() |
| 53 | |
| 54 | if css_selector: |
| 55 | selected_elements = body.select(css_selector) |
| 56 | if not selected_elements: |
| 57 | return { |
| 58 | 'markdown': '', |
| 59 | 'cleaned_html': '', |
| 60 | 'success': True, |
| 61 | 'media': {'images': [], 'videos': [], 'audios': []}, |
| 62 | 'links': {'internal': [], 'external': []}, |
| 63 | 'metadata': {}, |
| 64 | 'message': f"No elements found for CSS selector: {css_selector}" |
| 65 | } |
| 66 | # raise InvalidCSSSelectorError(f"Invalid CSS selector, No elements found for CSS selector: {css_selector}") |
| 67 | body = soup.new_tag('div') |
| 68 | for el in selected_elements: |
| 69 | body.append(el) |
| 70 | |
| 71 | links = {'internal': [], 'external': []} |
| 72 | media = {'images': [], 'videos': [], 'audios': []} |
| 73 | internal_links_dict = {} |
| 74 | external_links_dict = {} |
| 75 | |
| 76 | # Extract meaningful text for media files from closest parent |
| 77 | def find_closest_parent_with_useful_text(tag): |
| 78 | current_tag = tag |
| 79 | while current_tag: |
| 80 | current_tag = current_tag.parent |
| 81 | # Get the text content of the parent tag |
| 82 | if current_tag: |
| 83 | text_content = current_tag.get_text(separator=' ',strip=True) |
| 84 | # Check if the text content has at least word_count_threshold |
| 85 | if len(text_content.split()) >= image_description_min_word_threshold: |
| 86 | return text_content |
| 87 | return None |
| 88 | |
| 89 | def process_image(img, url, index, total_images): |
no outgoing calls
no test coverage detected
searching dependent graphs…