crawl_urls.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. import asyncio
  2. import pickle
  3. from pathlib import Path
  4. import random
  5. from typing import List
  6. import httpx
  7. import ssl
  8. from sqlmodel import select, Session
  9. from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult
  10. from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchPageResult, SearchResultItem
  11. from mylib.base import ensure_output_dir, save_to_file,load_from_pickle
  12. from mylib.logu import logger
  13. from utils.proxy_pool import get_random_proxy
  14. class URLCrawler:
  15. def __init__(self, max_concurrent: int = 3):
  16. self.max_concurrent = max_concurrent
  17. self.db_manager = SearchResultManager()
  18. # Create custom SSL context that allows self-signed certificates
  19. self.ssl_context = ssl.create_default_context()
  20. self.ssl_context.check_hostname = False
  21. self.ssl_context.verify_mode = ssl.CERT_NONE
  22. async def get_random_proxy(self) -> str:
  23. return get_random_proxy()
  24. url = f"{PROXY_POOL_BASE_URL}/get"
  25. async with httpx.AsyncClient() as client:
  26. response = await client.get(url, timeout=30)
  27. response.raise_for_status()
  28. results = response.json()
  29. port = results["port"]
  30. return f'http://127.0.0.1:{port}'
  31. async def download_pdf(self, url: str, output_path: Path):
  32. """Download PDF file from URL"""
  33. async with httpx.AsyncClient(verify=self.ssl_context) as client:
  34. response = await client.get(url)
  35. if response.status_code == 200:
  36. content_type = response.headers.get('content-type', '').lower()
  37. if 'pdf' in content_type:
  38. with open(output_path, 'wb') as f:
  39. f.write(response.content)
  40. return True
  41. return False
  42. async def crawl_url(self, url: str, item_id: int, output_dir: Path, browser_config: BrowserConfig = None, overwrite: bool = False) -> CrawlResult:
  43. """Crawl a single URL and save results with item_id as filename"""
  44. # Check if we should skip this URL
  45. with Session(self.db_manager.engine) as session:
  46. item = session.exec(
  47. select(SearchResultItem)
  48. .where(SearchResultItem.id == item_id)
  49. ).first()
  50. if item and item.html_path and not overwrite:
  51. logger.info(f"Skipping {url} (item_id: {item_id}) - already has html_path: {item.html_path}")
  52. return {"search_result_model": item, "crawl_result": None, 'message': 'already has html_path'}
  53. if not browser_config:
  54. browser_config = BrowserConfig(
  55. headless=True,
  56. verbose=False,
  57. extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
  58. proxy=await self.get_random_proxy()
  59. )
  60. crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
  61. logger.info(f"Crawling id: {item_id} URL: {url}")
  62. # First try to detect if it's a PDF by checking headers
  63. async with httpx.AsyncClient(verify=self.ssl_context) as client:
  64. try:
  65. response = await client.head(url)
  66. content_type = response.headers.get('content-type', '').lower()
  67. logger.info(f"crwal id {item_id} content_type {content_type}")
  68. if 'pdf' in content_type:
  69. pdf_path = output_dir / f"{item_id}.pdf"
  70. if await self.download_pdf(url, pdf_path):
  71. # Update database with PDF path
  72. with Session(self.db_manager.engine) as session:
  73. item = session.exec(
  74. select(SearchResultItem)
  75. .where(SearchResultItem.id == item_id)
  76. ).first()
  77. if item:
  78. item.html_path = str(pdf_path)
  79. session.add(item)
  80. session.commit()
  81. return {"search_result_model": item, "crawl_result": None, 'message': response.headers.get('content-type')}
  82. elif 'text/csv' in content_type:
  83. return {"search_result_model": item, "crawl_result": None,'message': f'skip {content_type} id {item_id}'}
  84. except Exception as e:
  85. logger.warning(f"Failed to check headers for id: {item_id} , {url} {str(e)}")
  86. # If not PDF or header check failed, try regular crawl
  87. crawler = AsyncWebCrawler(config=browser_config)
  88. await crawler.start()
  89. try:
  90. result:CrawlResult = await crawler.arun(url=url, config=crawl_config)
  91. logger.info(f"{item_id} crawler.arun result.success: {result.success} {result.status_code}")
  92. # If crawl failed but URL contains 'download', try PDF again
  93. if not result.success and 'download' in url.lower():
  94. pdf_path = output_dir / f"{item_id}.pdf"
  95. if await self.download_pdf(url, pdf_path):
  96. # Update database with PDF path
  97. with Session(self.db_manager.engine) as session:
  98. item = session.exec(
  99. select(SearchResultItem)
  100. .where(SearchResultItem.id == item_id)
  101. ).first()
  102. if item:
  103. item.html_path = str(pdf_path)
  104. session.add(item)
  105. session.commit()
  106. session.refresh(item)
  107. return {"search_result_model": item, "crawl_result": result}
  108. # Save results
  109. ensure_output_dir(output_dir)
  110. # Save pickle
  111. pickle_path = output_dir / f"{item_id}.pkl"
  112. with open(pickle_path, "wb") as f:
  113. pickle.dump(result, f)
  114. # Save HTML and Markdown if available
  115. html_path = None
  116. if result.html:
  117. html_path = output_dir / f"{item_id}.html"
  118. save_to_file(result.html, html_path)
  119. if result.markdown:
  120. md_path = output_dir / f"{item_id}.md"
  121. save_to_file(result.markdown, md_path)
  122. # Update database with HTML path
  123. if html_path:
  124. with Session(self.db_manager.engine) as session:
  125. item = session.exec(
  126. select(SearchResultItem)
  127. .where(SearchResultItem.id == item_id)
  128. ).first()
  129. if item:
  130. item.html_path = str(html_path)
  131. session.add(item)
  132. session.commit()
  133. session.refresh(item)
  134. return {"search_result_model": item, "crawl_result": result}
  135. finally:
  136. await crawler.close()
  137. async def crawl_page_urls(self, page_id: SearchPageResult, browser_config: BrowserConfig = None, overwrite: bool = False):
  138. """Crawl all URLs for a specific search page"""
  139. with Session(self.db_manager.engine) as session:
  140. # Get page info
  141. page = session.exec(
  142. select(SearchPageResult)
  143. .where(SearchPageResult.id == page_id)
  144. ).first()
  145. if not page:
  146. raise ValueError(f"Page with id {page_id} not found")
  147. logger.info(f"html_path: {page.html_path} , page: {page.results_count}")
  148. # Create output directory
  149. html_dir = Path(page.html_path).parent if page.html_path else Path("output")
  150. urls_dir = html_dir / "crawled_urls"
  151. ensure_output_dir(urls_dir)
  152. # Crawl URLs in parallel
  153. tasks = []
  154. for item in page.items:
  155. url = item.url
  156. item_id = item.id
  157. task = self.crawl_url(url, item_id, urls_dir, browser_config, overwrite)
  158. tasks.append(task)
  159. await asyncio.gather(*tasks, return_exceptions=True)
  160. async def crawl_keyword_urls(self, keyword: str, browser_config: BrowserConfig = None, overwrite: bool = False):
  161. """Crawl all URLs for a specific keyword"""
  162. with Session(self.db_manager.engine) as session:
  163. # Get keyword task
  164. task = session.exec(
  165. select(KeywordTask)
  166. .where(KeywordTask.keyword == keyword)
  167. .where(KeywordTask.is_completed == True)
  168. ).first()
  169. if not task:
  170. raise ValueError(f"Completed keyword {keyword} not found")
  171. # Crawl URLs for each page
  172. for page in task.pages:
  173. await self.crawl_page_urls(page.id, browser_config, overwrite)
  174. async def main():
  175. crawler = URLCrawler()
  176. # Example usage:
  177. # Crawl URLs for a specific keyword
  178. # await crawler.crawl_keyword_urls("example keyword", overwrite=True)
  179. # Or crawl URLs for a specific page
  180. await crawler.crawl_page_urls(594, overwrite=False)
  181. if __name__ == "__main__":
  182. asyncio.run(main())