MCPcopy
hub / github.com/unclecode/crawl4ai / process_html

Method process_html

crawl4ai/web_crawler.py:166–238  ·  view source on GitHub ↗
(
            self,
            url: str,
            html: str,
            extracted_content: str,
            word_count_threshold: int,
            extraction_strategy: ExtractionStrategy,
            chunking_strategy: ChunkingStrategy,
            css_selector: str,
            screenshot: bool,
            verbose: bool,
            is_cached: bool,
            **kwargs,
        )

Source from the content-addressed store, hash-verified

164 return CrawlResult(url=url, html="", success=False, error_message=e.msg)
165
166 def process_html(
167 self,
168 url: str,
169 html: str,
170 extracted_content: str,
171 word_count_threshold: int,
172 extraction_strategy: ExtractionStrategy,
173 chunking_strategy: ChunkingStrategy,
174 css_selector: str,
175 screenshot: bool,
176 verbose: bool,
177 is_cached: bool,
178 **kwargs,
179 ) -> CrawlResult:
180 t = time.time()
181 # Extract content from HTML
182 try:
183 t1 = time.time()
184 result = get_content_of_website_optimized(url, html, word_count_threshold, css_selector=css_selector, only_text=kwargs.get("only_text", False))
185 if verbose:
186 print(f"[LOG] 🚀 Content extracted for {url}, success: True, time taken: {time.time() - t1:.2f} seconds")
187
188 if result is None:
189 raise ValueError(f"Failed to extract content from the website: {url}")
190 except InvalidCSSSelectorError as e:
191 raise ValueError(str(e))
192
193 cleaned_html = sanitize_input_encode(result.get("cleaned_html", ""))
194 markdown = sanitize_input_encode(result.get("markdown", ""))
195 media = result.get("media", [])
196 links = result.get("links", [])
197 metadata = result.get("metadata", {})
198
199 if extracted_content is None:
200 if verbose:
201 print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}")
202
203 sections = chunking_strategy.chunk(markdown)
204 extracted_content = extraction_strategy.run(url, sections)
205 extracted_content = json.dumps(extracted_content, indent=4, default=str, ensure_ascii=False)
206
207 if verbose:
208 print(f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t:.2f} seconds.")
209
210 screenshot = None if not screenshot else screenshot
211
212 if not is_cached:
213 cache_url(
214 url,
215 html,
216 cleaned_html,
217 markdown,
218 extracted_content,
219 True,
220 json.dumps(media),
221 json.dumps(links),
222 json.dumps(metadata),
223 screenshot=screenshot,

Callers 1

runMethod · 0.95

Calls 7

sanitize_input_encodeFunction · 0.85
cache_urlFunction · 0.85
CrawlResultClass · 0.85
format_htmlFunction · 0.85
chunkMethod · 0.45
runMethod · 0.45

Tested by

no test coverage detected