crawl_urls.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. import asyncio
  2. import pickle
  3. from pathlib import Path
  4. import random
  5. from typing import List,Optional
  6. import httpx
  7. import ssl
  8. from pydantic import BaseModel, Field
  9. from sqlmodel import select, Session
  10. from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult
  11. from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchPageResult, SearchResultItem
  12. from mylib.base import ensure_output_dir, save_to_file,load_from_pickle
  13. from mylib.logu import logger
  14. from utils.proxy_pool import get_random_proxy
  15. class CrawlerResult(BaseModel):
  16. err: Optional[int] = 1
  17. search_result_model: Optional[SearchResultItem] = None
  18. crawl_result: Optional[CrawlResult] = None
  19. message: Optional[str] = None
  20. class URLCrawler:
  21. def __init__(self, max_concurrent: int = 3):
  22. self.max_concurrent = max_concurrent
  23. self.db_manager = SearchResultManager()
  24. # Create custom SSL context that allows self-signed certificates
  25. self.ssl_context = ssl.create_default_context()
  26. self.ssl_context.check_hostname = False
  27. self.ssl_context.verify_mode = ssl.CERT_NONE
  28. async def get_random_proxy(self, proxy_pool_url: str=None) -> str:
  29. # return get_random_proxy(proxy_pool_url)
  30. return await asyncio.to_thread(get_random_proxy,proxy_pool_url)
  31. url = f"{PROXY_POOL_BASE_URL}/get"
  32. async with httpx.AsyncClient() as client:
  33. response = await client.get(url, timeout=30)
  34. response.raise_for_status()
  35. results = response.json()
  36. port = results["port"]
  37. return f'http://127.0.0.1:{port}'
  38. async def download_pdf(self, url: str, output_path: Path):
  39. """Download PDF file from URL"""
  40. async with httpx.AsyncClient(verify=self.ssl_context) as client:
  41. response = await client.get(url)
  42. if response.status_code == 200:
  43. content_type = response.headers.get('content-type', '').lower()
  44. if 'pdf' in content_type:
  45. with open(output_path, 'wb') as f:
  46. f.write(response.content)
  47. return True
  48. return False
  49. async def crawl_url(self, url: str, item_id: int, output_dir: Path, browser_config: BrowserConfig = None, overwrite: bool = False) -> CrawlResult:
  50. """Crawl a single URL and save results with item_id as filename"""
  51. # Check if we should skip this URL
  52. item = None
  53. with Session(self.db_manager.engine) as session:
  54. item = session.exec(
  55. select(SearchResultItem)
  56. .where(SearchResultItem.id == item_id)
  57. ).first()
  58. if item and item.save_path and not overwrite:
  59. logger.info(f"Skipping {url} (item_id: {item_id}) - already has save_path: {item.save_path}")
  60. return {"search_result_model": item, "crawl_result": None, 'message': 'already has save_path'}
  61. if not browser_config:
  62. browser_config = BrowserConfig(
  63. headless=True,
  64. verbose=False,
  65. extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
  66. )
  67. logger.info(f"Crawling id: {item_id} URL: {url}")
  68. content_type = ''
  69. # First try to detect if it's a PDF by checking headers
  70. async with httpx.AsyncClient(verify=self.ssl_context) as client:
  71. try:
  72. pdf_path = ''
  73. if url.endswith('.pdf'):
  74. pdf_path = output_dir / f"{item_id}.pdf"
  75. else:
  76. response = await client.head(url)
  77. content_type = response.headers.get('content-type', '').lower()
  78. if 'pdf' in content_type:
  79. pdf_path = output_dir / f"{item_id}.pdf"
  80. except Exception as e:
  81. logger.warning(f"Failed to check headers for id: {item_id} , {url} {str(e)}")
  82. logger.info(f"crwal id {item_id} content_type {content_type} {pdf_path}")
  83. search_result_model = item or SearchResultItem(id=item_id)
  84. search_result_model.content_type = content_type
  85. if pdf_path:
  86. try:
  87. if await self.download_pdf(url, pdf_path):
  88. search_result_model.save_path = str(pdf_path)
  89. else:
  90. logger.warning(f"Failed to download PDF for id: {item_id}, {url}")
  91. return CrawlerResult(err=1, message='failed to download pdf', search_result_model=None, crawl_result=None)
  92. self.db_manager.add_or_update_search_result_item(search_result_model)
  93. logger.info(f"{item_id} download_pdf success {pdf_path}")
  94. # PDF必须要返回了,因为 crawl4ai 如果是文件类型,
  95. # 它会默认下载到路径,反而得不到自动下载的 PDF 文件,除非能额外监听它是否下载成功
  96. return CrawlerResult(err=0, message='success', search_result_model=search_result_model, crawl_result=None)
  97. except Exception as e:
  98. logger.warning(f"Failed to download PDF for id: {item_id}, {url} {str(e)}")
  99. return CrawlerResult(err=1, message=str(e), search_result_model=None, crawl_result=None)
  100. if 'html' not in content_type:
  101. logger.info(f"Skipping {url} (item_id: {item_id}) - not html, conent_type {content_type}")
  102. return CrawlerResult(err=2, message='not html', search_result_model=search_result_model, crawl_result=None)
  103. logger.info(f"crawler.arun start {item_id} content-type: {content_type}, {url} ")
  104. logger.info(f"browser_config use_managed_browser {browser_config.use_managed_browser} , cdp_url: {browser_config.cdp_url}, headless: {browser_config.headless}")
  105. # If not PDF or header check failed, try regular crawl
  106. crawler = AsyncWebCrawler(config=browser_config)
  107. await crawler.start()
  108. try:
  109. crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
  110. result:CrawlResult = await crawler.arun(url=url, config=crawl_config)
  111. logger.info(f"{item_id} crawler.arun result.success: {result.success} {result.status_code}")
  112. # Save results
  113. ensure_output_dir(output_dir)
  114. # Save pickle
  115. pickle_path = output_dir / f"{item_id}.pkl"
  116. with open(pickle_path, "wb") as f:
  117. pickle.dump(result, f)
  118. # Save HTML and Markdown if available
  119. save_path = None
  120. if result.html:
  121. save_path = output_dir / f"{item_id}.html"
  122. save_to_file(result.html, save_path)
  123. search_result_model.save_path = str(save_path)
  124. if result.markdown:
  125. md_path = output_dir / f"{item_id}.md"
  126. save_to_file(result.markdown, md_path)
  127. search_result_model.markdown_path = str(md_path)
  128. # Update database with HTML path
  129. if save_path:
  130. self.db_manager.add_or_update_search_result_item(search_result_model)
  131. logger.info(f"{item_id} crawler.arun result.success: {item}")
  132. return CrawlerResult(err=0, message='success', search_result_model=search_result_model, crawl_result=result)
  133. except Exception as e:
  134. logger.error(f"Failed to crawl id: {item_id} , {url} {str(e)}")
  135. return CrawlerResult(err=1, message=str(e), search_result_model=search_result_model, crawl_result=result)
  136. finally:
  137. await crawler.close()
  138. async def crawl_page_urls(self, page_id: SearchPageResult, browser_config: BrowserConfig = None, overwrite: bool = False, dry_run: bool = False):
  139. """Crawl all URLs for a specific search page"""
  140. with Session(self.db_manager.engine) as session:
  141. # Get page info
  142. page = session.exec(
  143. select(SearchPageResult)
  144. .where(SearchPageResult.id == page_id)
  145. ).first()
  146. if not page:
  147. raise ValueError(f"Page with id {page_id} not found")
  148. logger.info(f"提取搜索结果页: {page.html_path} ")
  149. logger.info(f"提取到页面数量: {page.results_count}")
  150. # Create output directory
  151. html_dir = Path(page.html_path).parent if page.html_path else Path("output")
  152. urls_dir = html_dir / "crawled_urls"
  153. ensure_output_dir(urls_dir)
  154. # Crawl URLs in parallel
  155. tasks = []
  156. for item in page.items:
  157. url = item.url
  158. item_id = item.id
  159. if dry_run:
  160. logger.info(f"crawl_url {item_id} {url}")
  161. async def dry_run_task():
  162. dry_run_ret = {"search_result_model": '', "crawl_result": None,'message': 'dry_run'}
  163. return dry_run_ret
  164. task = asyncio.create_task(dry_run_task())
  165. else:
  166. task = self.crawl_url(url, item_id, urls_dir, browser_config, overwrite)
  167. tasks.append(task)
  168. return urls_dir, await asyncio.gather(*tasks, return_exceptions=True)
  169. async def crawl_keyword_urls(self, keyword: str, browser_config: BrowserConfig = None, overwrite: bool = False):
  170. """Crawl all URLs for a specific keyword"""
  171. with Session(self.db_manager.engine) as session:
  172. # Get keyword task
  173. task = session.exec(
  174. select(KeywordTask)
  175. .where(KeywordTask.keyword == keyword)
  176. .where(KeywordTask.is_completed == True)
  177. ).first()
  178. if not task:
  179. raise ValueError(f"Completed keyword {keyword} not found")
  180. # Crawl URLs for each page
  181. for page in task.pages:
  182. await self.crawl_page_urls(page.id, browser_config, overwrite)
  183. async def main():
  184. crawler = URLCrawler()
  185. # Example usage:
  186. # Crawl URLs for a specific keyword
  187. # await crawler.crawl_keyword_urls("example keyword", overwrite=True)
  188. # Or crawl URLs for a specific page
  189. await crawler.crawl_page_urls(594, overwrite=False)
  190. if __name__ == "__main__":
  191. asyncio.run(main())