import asyncio import pickle from pathlib import Path import random from typing import List import httpx import ssl from sqlmodel import select, Session from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchPageResult, SearchResultItem from mylib.base import ensure_output_dir, save_to_file,load_from_pickle from mylib.logu import logger from utils.proxy_pool import get_random_proxy class URLCrawler: def __init__(self, max_concurrent: int = 3): self.max_concurrent = max_concurrent self.db_manager = SearchResultManager() # Create custom SSL context that allows self-signed certificates self.ssl_context = ssl.create_default_context() self.ssl_context.check_hostname = False self.ssl_context.verify_mode = ssl.CERT_NONE async def get_random_proxy(self) -> str: return get_random_proxy() url = f"{PROXY_POOL_BASE_URL}/get" async with httpx.AsyncClient() as client: response = await client.get(url, timeout=30) response.raise_for_status() results = response.json() port = results["port"] return f'http://127.0.0.1:{port}' async def download_pdf(self, url: str, output_path: Path): """Download PDF file from URL""" async with httpx.AsyncClient(verify=self.ssl_context) as client: response = await client.get(url) if response.status_code == 200: content_type = response.headers.get('content-type', '').lower() if 'pdf' in content_type: with open(output_path, 'wb') as f: f.write(response.content) return True return False async def crawl_url(self, url: str, item_id: int, output_dir: Path, browser_config: BrowserConfig = None, overwrite: bool = False) -> CrawlResult: """Crawl a single URL and save results with item_id as filename""" # Check if we should skip this URL with Session(self.db_manager.engine) as session: item = session.exec( select(SearchResultItem) .where(SearchResultItem.id == item_id) ).first() if item and item.html_path and not overwrite: logger.info(f"Skipping {url} (item_id: {item_id}) - already has html_path: {item.html_path}") return {"search_result_model": item, "crawl_result": None, 'message': 'already has html_path'} if not browser_config: browser_config = BrowserConfig( headless=True, verbose=False, extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"], ) crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED) logger.info(f"Crawling id: {item_id} URL: {url}") # First try to detect if it's a PDF by checking headers async with httpx.AsyncClient(verify=self.ssl_context) as client: try: response = await client.head(url) content_type = response.headers.get('content-type', '').lower() logger.info(f"crwal id {item_id} content_type {content_type}") if 'pdf' in content_type: pdf_path = output_dir / f"{item_id}.pdf" if await self.download_pdf(url, pdf_path): # Update database with PDF path with Session(self.db_manager.engine) as session: item = session.exec( select(SearchResultItem) .where(SearchResultItem.id == item_id) ).first() if item: item.html_path = str(pdf_path) session.add(item) session.commit() return {"search_result_model": item, "crawl_result": None, 'message': response.headers.get('content-type')} elif 'text/csv' in content_type: return {"search_result_model": item, "crawl_result": None,'message': f'skip {content_type} id {item_id}'} except Exception as e: logger.warning(f"Failed to check headers for id: {item_id} , {url} {str(e)}") # If not PDF or header check failed, try regular crawl crawler = AsyncWebCrawler(config=browser_config) await crawler.start() try: result:CrawlResult = await crawler.arun(url=url, config=crawl_config) logger.info(f"{item_id} crawler.arun result.success: {result.success} {result.status_code}") # If crawl failed but URL contains 'download', try PDF again if not result.success and 'download' in url.lower(): pdf_path = output_dir / f"{item_id}.pdf" if await self.download_pdf(url, pdf_path): # Update database with PDF path with Session(self.db_manager.engine) as session: item = session.exec( select(SearchResultItem) .where(SearchResultItem.id == item_id) ).first() if item: item.html_path = str(pdf_path) session.add(item) session.commit() session.refresh(item) return {"search_result_model": item, "crawl_result": result} # Save results ensure_output_dir(output_dir) # Save pickle pickle_path = output_dir / f"{item_id}.pkl" with open(pickle_path, "wb") as f: pickle.dump(result, f) # Save HTML and Markdown if available html_path = None if result.html: html_path = output_dir / f"{item_id}.html" save_to_file(result.html, html_path) if result.markdown: md_path = output_dir / f"{item_id}.md" save_to_file(result.markdown, md_path) # Update database with HTML path if html_path: with Session(self.db_manager.engine) as session: item = session.exec( select(SearchResultItem) .where(SearchResultItem.id == item_id) ).first() if item: item.html_path = str(html_path) session.add(item) session.commit() session.refresh(item) return {"search_result_model": item, "crawl_result": result} finally: await crawler.close() async def crawl_page_urls(self, page_id: SearchPageResult, browser_config: BrowserConfig = None, overwrite: bool = False): """Crawl all URLs for a specific search page""" with Session(self.db_manager.engine) as session: # Get page info page = session.exec( select(SearchPageResult) .where(SearchPageResult.id == page_id) ).first() if not page: raise ValueError(f"Page with id {page_id} not found") logger.info(f"html_path: {page.html_path} , page: {page.results_count}") # Create output directory html_dir = Path(page.html_path).parent if page.html_path else Path("output") urls_dir = html_dir / "crawled_urls" ensure_output_dir(urls_dir) # Crawl URLs in parallel tasks = [] for item in page.items: url = item.url item_id = item.id task = self.crawl_url(url, item_id, urls_dir, browser_config, overwrite) tasks.append(task) return await asyncio.gather(*tasks, return_exceptions=True) async def crawl_keyword_urls(self, keyword: str, browser_config: BrowserConfig = None, overwrite: bool = False): """Crawl all URLs for a specific keyword""" with Session(self.db_manager.engine) as session: # Get keyword task task = session.exec( select(KeywordTask) .where(KeywordTask.keyword == keyword) .where(KeywordTask.is_completed == True) ).first() if not task: raise ValueError(f"Completed keyword {keyword} not found") # Crawl URLs for each page for page in task.pages: await self.crawl_page_urls(page.id, browser_config, overwrite) async def main(): crawler = URLCrawler() # Example usage: # Crawl URLs for a specific keyword # await crawler.crawl_keyword_urls("example keyword", overwrite=True) # Or crawl URLs for a specific page await crawler.crawl_page_urls(594, overwrite=False) if __name__ == "__main__": asyncio.run(main())