search_manager.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from pathlib import Path
  2. import asyncio
  3. from DrissionPage import ChromiumPage
  4. from crawl4ai import AsyncWebCrawler, CacheMode, CrawlResult
  5. from mylib.base import (
  6. save_to_file,
  7. OUTPUT_DIR,
  8. ensure_output_dir,
  9. replace_space,
  10. browser_config
  11. )
  12. from database.sql_model import DatabaseManager,SearchResult
  13. from lxml import html
  14. class SearchManager:
  15. def __init__(self, page: ChromiumPage):
  16. self.page = page
  17. self.db_manager = DatabaseManager()
  18. def search_keyword(self, keyword: str, start: int = 0) -> SearchResult:
  19. """搜索关键词并返回结果链接和保存的HTML文件路径
  20. Args:
  21. keyword: 要搜索的关键词
  22. start: 起始结果位置
  23. Returns:
  24. 包含搜索结果链接的列表和保存的HTML文件路径
  25. """
  26. url = f"https://www.google.com/search?q={keyword}&start={start}"
  27. self.page.get(url)
  28. # 保存HTML文件
  29. html_path = self.save_page(keyword, start)
  30. # 保存到数据库
  31. return self.db_manager.save_search_result(
  32. keyword=keyword,
  33. start=start,
  34. url=url,
  35. html_path=str(html_path)
  36. )
  37. async def next_page(self, keyword: str, current_start: int) -> list[str]:
  38. """翻到下一页并返回结果链接
  39. Args:
  40. keyword: 要搜索的关键词
  41. current_start: 当前起始结果位置
  42. Returns:
  43. 包含所有搜索结果链接的列表
  44. """
  45. return await self.search_keyword(keyword, current_start + 10)
  46. def save_page(self, keyword: str, start: int) -> Path:
  47. """保存当前页面"""
  48. keyword = replace_space(keyword)
  49. save_dir = OUTPUT_DIR / keyword
  50. ensure_output_dir(save_dir)
  51. save_path = save_dir / f"{start}.html"
  52. save_to_file(self.page.html, save_path)
  53. return save_path
  54. async def _process_page(self, url: str) -> CrawlResult:
  55. """处理页面内容"""
  56. async with AsyncWebCrawler(config=browser_config) as crawler:
  57. return await crawler.arun(
  58. url=url,
  59. cache_mode=CacheMode.ENABLED,
  60. user_agent='random'
  61. )
  62. def is_search_result_empty(self, html_content: str) -> bool:
  63. """检查搜索结果是否为空"""
  64. tree = html.fromstring(html_content)
  65. search_elements = tree.xpath('//*[@id="search"]/*')
  66. return len(search_elements) == 0
  67. def take_screenshot(self, save_path: Path) -> Path:
  68. """截图当前页面"""
  69. return self.page.get_screenshot(save_path)
  70. def check_cache(self, file_path: Path) -> bool:
  71. """检查缓存文件是否存在"""
  72. return file_path.exists()
  73. def load_from_cache(self, file_path: Path):
  74. """从缓存加载页面"""
  75. self.page.get(f"file://{file_path}")
  76. def go_to_next_page(self) -> bool:
  77. """跳转到下一页"""
  78. next_button = self.page.ele('#pnnext', timeout=1)
  79. if not next_button:
  80. return False
  81. next_button.click()
  82. return True
  83. def go_to_prev_page(self) -> bool:
  84. """跳转到上一页"""
  85. prev_button = self.page.ele('#pnprev', timeout=1)
  86. if not prev_button:
  87. return False
  88. prev_button.click()
  89. return True
  90. def extract_search_results(self, html_content: str) -> list[str]:
  91. """从搜索结果页面提取所有链接
  92. Args:
  93. html_content: 页面HTML内容
  94. Returns:
  95. 包含所有搜索结果链接的列表
  96. """
  97. tree = html.fromstring(html_content)
  98. rso = tree.xpath('//*[@id="search"]//*[@id="rso"]')[0]
  99. links = []
  100. for element in rso.xpath('.//*[@href]'):
  101. href = element.get('href')
  102. if href and not href.startswith('#'):
  103. links.append(href)
  104. return links
  105. async def main():
  106. from mylib.drission_page import load_chrome_from_ini
  107. # 初始化浏览器
  108. page = load_chrome_from_ini()
  109. manager = SearchManager(page)
  110. # 示例用法
  111. keyword = "Acalypha matsudae essential oil"
  112. # 搜索关键词
  113. res = manager.search_keyword(keyword)
  114. print(f"Found results: {res.model_dump_json(indent=4)} ")
  115. if __name__ == "__main__":
  116. asyncio.run(main())