crawl_urls.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import asyncio
  2. import pickle
  3. from pathlib import Path
  4. import random
  5. from typing import List
  6. import httpx
  7. import ssl
  8. from sqlmodel import select, Session
  9. from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult
  10. from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchPageResult, SearchResultItem
  11. from mylib.base import ensure_output_dir, save_to_file,load_from_pickle
  12. from mylib.logu import logger
  13. from utils.proxy_pool import get_random_proxy
  14. class URLCrawler:
  15. def __init__(self, max_concurrent: int = 3):
  16. self.max_concurrent = max_concurrent
  17. self.db_manager = SearchResultManager()
  18. # Create custom SSL context that allows self-signed certificates
  19. self.ssl_context = ssl.create_default_context()
  20. self.ssl_context.check_hostname = False
  21. self.ssl_context.verify_mode = ssl.CERT_NONE
  22. async def get_random_proxy(self) -> str:
  23. return get_random_proxy()
  24. url = f"{PROXY_POOL_BASE_URL}/get"
  25. async with httpx.AsyncClient() as client:
  26. response = await client.get(url, timeout=30)
  27. response.raise_for_status()
  28. results = response.json()
  29. port = results["port"]
  30. return f'http://127.0.0.1:{port}'
  31. async def download_pdf(self, url: str, output_path: Path):
  32. """Download PDF file from URL"""
  33. async with httpx.AsyncClient(verify=self.ssl_context) as client:
  34. response = await client.get(url)
  35. if response.status_code == 200:
  36. content_type = response.headers.get('content-type', '').lower()
  37. if 'pdf' in content_type:
  38. with open(output_path, 'wb') as f:
  39. f.write(response.content)
  40. return True
  41. return False
  42. async def crawl_url(self, url: str, item_id: int, output_dir: Path, browser_config: BrowserConfig = None, overwrite: bool = False) -> CrawlResult:
  43. """Crawl a single URL and save results with item_id as filename"""
  44. # Check if we should skip this URL
  45. with Session(self.db_manager.engine) as session:
  46. item = session.exec(
  47. select(SearchResultItem)
  48. .where(SearchResultItem.id == item_id)
  49. ).first()
  50. if item and item.html_path and not overwrite:
  51. logger.info(f"Skipping {url} (item_id: {item_id}) - already has html_path: {item.html_path}")
  52. return {"search_result_model": item, "crawl_result": None, 'message': 'already has html_path'}
  53. if not browser_config:
  54. browser_config = BrowserConfig(
  55. headless=True,
  56. verbose=False,
  57. extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
  58. )
  59. crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
  60. logger.info(f"Crawling id: {item_id} URL: {url}")
  61. # First try to detect if it's a PDF by checking headers
  62. async with httpx.AsyncClient(verify=self.ssl_context) as client:
  63. try:
  64. response = await client.head(url)
  65. content_type = response.headers.get('content-type', '').lower()
  66. logger.info(f"crwal id {item_id} content_type {content_type}")
  67. if 'pdf' in content_type:
  68. pdf_path = output_dir / f"{item_id}.pdf"
  69. if await self.download_pdf(url, pdf_path):
  70. # Update database with PDF path
  71. with Session(self.db_manager.engine) as session:
  72. item = session.exec(
  73. select(SearchResultItem)
  74. .where(SearchResultItem.id == item_id)
  75. ).first()
  76. if item:
  77. item.html_path = str(pdf_path)
  78. session.add(item)
  79. session.commit()
  80. return {"search_result_model": item, "crawl_result": None, 'message': response.headers.get('content-type')}
  81. elif 'text/csv' in content_type:
  82. return {"search_result_model": item, "crawl_result": None,'message': f'skip {content_type} id {item_id}'}
  83. except Exception as e:
  84. logger.warning(f"Failed to check headers for id: {item_id} , {url} {str(e)}")
  85. # If not PDF or header check failed, try regular crawl
  86. crawler = AsyncWebCrawler(config=browser_config)
  87. await crawler.start()
  88. try:
  89. result:CrawlResult = await crawler.arun(url=url, config=crawl_config)
  90. logger.info(f"{item_id} crawler.arun result.success: {result.success} {result.status_code}")
  91. # If crawl failed but URL contains 'download', try PDF again
  92. if not result.success and 'download' in url.lower():
  93. pdf_path = output_dir / f"{item_id}.pdf"
  94. if await self.download_pdf(url, pdf_path):
  95. # Update database with PDF path
  96. with Session(self.db_manager.engine) as session:
  97. item = session.exec(
  98. select(SearchResultItem)
  99. .where(SearchResultItem.id == item_id)
  100. ).first()
  101. if item:
  102. item.html_path = str(pdf_path)
  103. session.add(item)
  104. session.commit()
  105. session.refresh(item)
  106. return {"search_result_model": item, "crawl_result": result}
  107. # Save results
  108. ensure_output_dir(output_dir)
  109. # Save pickle
  110. pickle_path = output_dir / f"{item_id}.pkl"
  111. with open(pickle_path, "wb") as f:
  112. pickle.dump(result, f)
  113. # Save HTML and Markdown if available
  114. html_path = None
  115. if result.html:
  116. html_path = output_dir / f"{item_id}.html"
  117. save_to_file(result.html, html_path)
  118. if result.markdown:
  119. md_path = output_dir / f"{item_id}.md"
  120. save_to_file(result.markdown, md_path)
  121. # Update database with HTML path
  122. if html_path:
  123. with Session(self.db_manager.engine) as session:
  124. item = session.exec(
  125. select(SearchResultItem)
  126. .where(SearchResultItem.id == item_id)
  127. ).first()
  128. if item:
  129. item.html_path = str(html_path)
  130. session.add(item)
  131. session.commit()
  132. session.refresh(item)
  133. return {"search_result_model": item, "crawl_result": result}
  134. finally:
  135. await crawler.close()
  136. async def crawl_page_urls(self, page_id: SearchPageResult, browser_config: BrowserConfig = None, overwrite: bool = False):
  137. """Crawl all URLs for a specific search page"""
  138. with Session(self.db_manager.engine) as session:
  139. # Get page info
  140. page = session.exec(
  141. select(SearchPageResult)
  142. .where(SearchPageResult.id == page_id)
  143. ).first()
  144. if not page:
  145. raise ValueError(f"Page with id {page_id} not found")
  146. logger.info(f"html_path: {page.html_path} , page: {page.results_count}")
  147. # Create output directory
  148. html_dir = Path(page.html_path).parent if page.html_path else Path("output")
  149. urls_dir = html_dir / "crawled_urls"
  150. ensure_output_dir(urls_dir)
  151. # Crawl URLs in parallel
  152. tasks = []
  153. for item in page.items:
  154. url = item.url
  155. item_id = item.id
  156. task = self.crawl_url(url, item_id, urls_dir, browser_config, overwrite)
  157. tasks.append(task)
  158. return await asyncio.gather(*tasks, return_exceptions=True)
  159. async def crawl_keyword_urls(self, keyword: str, browser_config: BrowserConfig = None, overwrite: bool = False):
  160. """Crawl all URLs for a specific keyword"""
  161. with Session(self.db_manager.engine) as session:
  162. # Get keyword task
  163. task = session.exec(
  164. select(KeywordTask)
  165. .where(KeywordTask.keyword == keyword)
  166. .where(KeywordTask.is_completed == True)
  167. ).first()
  168. if not task:
  169. raise ValueError(f"Completed keyword {keyword} not found")
  170. # Crawl URLs for each page
  171. for page in task.pages:
  172. await self.crawl_page_urls(page.id, browser_config, overwrite)
  173. async def main():
  174. crawler = URLCrawler()
  175. # Example usage:
  176. # Crawl URLs for a specific keyword
  177. # await crawler.crawl_keyword_urls("example keyword", overwrite=True)
  178. # Or crawl URLs for a specific page
  179. await crawler.crawl_page_urls(594, overwrite=False)
  180. if __name__ == "__main__":
  181. asyncio.run(main())