crawl_urls.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. import asyncio
  2. import pickle
  3. from pathlib import Path
  4. import random
  5. from typing import List
  6. import httpx
  7. import ssl
  8. from sqlmodel import select, Session
  9. from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult
  10. from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchPageResult, SearchResultItem
  11. from mylib.base import ensure_output_dir, save_to_file,load_from_pickle
  12. from mylib.logu import logger
  13. from utils.proxy_pool import get_random_proxy
  14. class URLCrawler:
  15. def __init__(self, max_concurrent: int = 3):
  16. self.max_concurrent = max_concurrent
  17. self.db_manager = SearchResultManager()
  18. # Create custom SSL context that allows self-signed certificates
  19. self.ssl_context = ssl.create_default_context()
  20. self.ssl_context.check_hostname = False
  21. self.ssl_context.verify_mode = ssl.CERT_NONE
  22. async def get_random_proxy(self, proxy_pool_url: str=None) -> str:
  23. # return get_random_proxy(proxy_pool_url)
  24. return await asyncio.to_thread(get_random_proxy,proxy_pool_url)
  25. url = f"{PROXY_POOL_BASE_URL}/get"
  26. async with httpx.AsyncClient() as client:
  27. response = await client.get(url, timeout=30)
  28. response.raise_for_status()
  29. results = response.json()
  30. port = results["port"]
  31. return f'http://127.0.0.1:{port}'
  32. async def download_pdf(self, url: str, output_path: Path):
  33. """Download PDF file from URL"""
  34. async with httpx.AsyncClient(verify=self.ssl_context) as client:
  35. response = await client.get(url)
  36. if response.status_code == 200:
  37. content_type = response.headers.get('content-type', '').lower()
  38. if 'pdf' in content_type:
  39. with open(output_path, 'wb') as f:
  40. f.write(response.content)
  41. return True
  42. return False
  43. async def crawl_url(self, url: str, item_id: int, output_dir: Path, browser_config: BrowserConfig = None, overwrite: bool = False) -> CrawlResult:
  44. """Crawl a single URL and save results with item_id as filename"""
  45. # Check if we should skip this URL
  46. with Session(self.db_manager.engine) as session:
  47. item = session.exec(
  48. select(SearchResultItem)
  49. .where(SearchResultItem.id == item_id)
  50. ).first()
  51. if item and item.html_path and not overwrite:
  52. logger.info(f"Skipping {url} (item_id: {item_id}) - already has html_path: {item.html_path}")
  53. return {"search_result_model": item, "crawl_result": None, 'message': 'already has html_path'}
  54. if not browser_config:
  55. browser_config = BrowserConfig(
  56. headless=True,
  57. verbose=False,
  58. extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
  59. )
  60. logger.info(f"Crawling id: {item_id} URL: {url}")
  61. content_type = ''
  62. # First try to detect if it's a PDF by checking headers
  63. async with httpx.AsyncClient(verify=self.ssl_context) as client:
  64. try:
  65. pdf_path = ''
  66. if url.endswith('.pdf'):
  67. pdf_path = output_dir / f"{item_id}.pdf"
  68. else:
  69. response = await client.head(url)
  70. content_type = response.headers.get('content-type', '').lower()
  71. if 'pdf' in content_type:
  72. pdf_path = output_dir / f"{item_id}.pdf"
  73. logger.info(f"crwal id {item_id} content_type {content_type} {pdf_path}")
  74. if pdf_path:
  75. if await self.download_pdf(url, pdf_path):
  76. # Update database with PDF path
  77. with Session(self.db_manager.engine) as session:
  78. item = session.exec(
  79. select(SearchResultItem)
  80. .where(SearchResultItem.id == item_id)
  81. ).first()
  82. if item:
  83. item.html_path = str(pdf_path)
  84. session.add(item)
  85. session.commit()
  86. return {"search_result_model": item, "crawl_result": None, 'message': response.headers.get('content-type')}
  87. except Exception as e:
  88. logger.warning(f"Failed to check headers for id: {item_id} , {url} {str(e)}")
  89. # return {"search_result_model": None, "crawl_result": None, 'message': str(e)}
  90. # if 'html' not in content_type:
  91. # logger.info(f"Skipping {url} (item_id: {item_id}) - not html, conent_type {content_type}")
  92. # return {"search_result_model": None, "crawl_result": None,'message': f'not html, content_type {content_type}'}
  93. logger.info(f"crawler.arun start {item_id} content-type: {content_type}, {url} ")
  94. logger.info(f"browser_config use_managed_browser {browser_config.use_managed_browser} , cdp_url: {browser_config.cdp_url}, headless: {browser_config.headless}")
  95. # If not PDF or header check failed, try regular crawl
  96. crawler = AsyncWebCrawler(config=browser_config)
  97. await crawler.start()
  98. try:
  99. crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
  100. result:CrawlResult = await crawler.arun(url=url, config=crawl_config)
  101. logger.info(f"{item_id} crawler.arun result.success: {result.success} {result.status_code}")
  102. # If crawl failed but URL contains 'download', try PDF again
  103. if not result.success and 'download' in url.lower():
  104. pdf_path = output_dir / f"{item_id}.pdf"
  105. if await self.download_pdf(url, pdf_path):
  106. # Update database with PDF path
  107. with Session(self.db_manager.engine) as session:
  108. item = session.exec(
  109. select(SearchResultItem)
  110. .where(SearchResultItem.id == item_id)
  111. ).first()
  112. if item:
  113. item.html_path = str(pdf_path)
  114. session.add(item)
  115. session.commit()
  116. session.refresh(item)
  117. return {"search_result_model": item, "crawl_result": result}
  118. # Save results
  119. ensure_output_dir(output_dir)
  120. # Save pickle
  121. pickle_path = output_dir / f"{item_id}.pkl"
  122. with open(pickle_path, "wb") as f:
  123. pickle.dump(result, f)
  124. # Save HTML and Markdown if available
  125. html_path = None
  126. if result.html:
  127. html_path = output_dir / f"{item_id}.html"
  128. save_to_file(result.html, html_path)
  129. if result.markdown:
  130. md_path = output_dir / f"{item_id}.md"
  131. save_to_file(result.markdown, md_path)
  132. # Update database with HTML path
  133. if html_path:
  134. with Session(self.db_manager.engine) as session:
  135. item = session.exec(
  136. select(SearchResultItem)
  137. .where(SearchResultItem.id == item_id)
  138. ).first()
  139. if item:
  140. item.html_path = str(html_path)
  141. session.add(item)
  142. session.commit()
  143. session.refresh(item)
  144. logger.info(f"{item_id} crawler.arun result.success: {item}")
  145. return {"search_result_model": item, "crawl_result": result}
  146. except Exception as e:
  147. logger.error(f"Failed to crawl id: {item_id} , {url} {str(e)}")
  148. return {"search_result_model": item, "crawl_result": None, 'message': str(e)}
  149. finally:
  150. await crawler.close()
  151. async def crawl_page_urls(self, page_id: SearchPageResult, browser_config: BrowserConfig = None, overwrite: bool = False, dry_run: bool = False):
  152. """Crawl all URLs for a specific search page"""
  153. with Session(self.db_manager.engine) as session:
  154. # Get page info
  155. page = session.exec(
  156. select(SearchPageResult)
  157. .where(SearchPageResult.id == page_id)
  158. ).first()
  159. if not page:
  160. raise ValueError(f"Page with id {page_id} not found")
  161. logger.info(f"提取搜索结果页: {page.html_path} ")
  162. logger.info(f"提取到页面数量: {page.results_count}")
  163. # Create output directory
  164. html_dir = Path(page.html_path).parent if page.html_path else Path("output")
  165. urls_dir = html_dir / "crawled_urls"
  166. ensure_output_dir(urls_dir)
  167. # Crawl URLs in parallel
  168. tasks = []
  169. for item in page.items:
  170. url = item.url
  171. item_id = item.id
  172. if dry_run:
  173. logger.info(f"crawl_url {item_id} {url}")
  174. async def dry_run_task():
  175. dry_run_ret = {"search_result_model": '', "crawl_result": None,'message': 'dry_run'}
  176. return dry_run_ret
  177. task = asyncio.create_task(dry_run_task())
  178. else:
  179. task = self.crawl_url(url, item_id, urls_dir, browser_config, overwrite)
  180. tasks.append(task)
  181. return urls_dir, await asyncio.gather(*tasks, return_exceptions=True)
  182. async def crawl_keyword_urls(self, keyword: str, browser_config: BrowserConfig = None, overwrite: bool = False):
  183. """Crawl all URLs for a specific keyword"""
  184. with Session(self.db_manager.engine) as session:
  185. # Get keyword task
  186. task = session.exec(
  187. select(KeywordTask)
  188. .where(KeywordTask.keyword == keyword)
  189. .where(KeywordTask.is_completed == True)
  190. ).first()
  191. if not task:
  192. raise ValueError(f"Completed keyword {keyword} not found")
  193. # Crawl URLs for each page
  194. for page in task.pages:
  195. await self.crawl_page_urls(page.id, browser_config, overwrite)
  196. async def main():
  197. crawler = URLCrawler()
  198. # Example usage:
  199. # Crawl URLs for a specific keyword
  200. # await crawler.crawl_keyword_urls("example keyword", overwrite=True)
  201. # Or crawl URLs for a specific page
  202. await crawler.crawl_page_urls(594, overwrite=False)
  203. if __name__ == "__main__":
  204. asyncio.run(main())