search_manager.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. from pathlib import Path
  2. import asyncio
  3. from DrissionPage import ChromiumPage
  4. from crawl4ai import AsyncWebCrawler, CacheMode, CrawlResult
  5. from mylib.base import (
  6. save_to_file,
  7. OUTPUT_DIR,
  8. ensure_output_dir,
  9. replace_space,
  10. browser_config
  11. )
  12. from database.sql_model import DatabaseManager,SearchResult
  13. from lxml import html
  14. class SearchManager:
  15. def __init__(self, page: ChromiumPage):
  16. self.page = page
  17. self.db_manager = DatabaseManager()
  18. def search_keyword(self, keyword: str, start: int = 0) -> SearchResult:
  19. """搜索关键词并返回结果链接和保存的HTML文件路径
  20. Args:
  21. keyword: 要搜索的关键词
  22. start: 起始结果位置
  23. Returns:
  24. 包含搜索结果链接的列表和保存的HTML文件路径
  25. """
  26. url = f"https://www.google.com/search?q={keyword}&start={start}"
  27. self.page.get(url)
  28. # 保存HTML文件
  29. html_path = self.save_page(keyword, start)
  30. # 保存到数据库
  31. restult = self.db_manager.save_search_result(
  32. keyword=keyword,
  33. start=start,
  34. url=url,
  35. html_path=str(html_path)
  36. )
  37. return html_path
  38. async def next_page(self, keyword: str, current_start: int) -> list[str]:
  39. """翻到下一页并返回结果链接
  40. Args:
  41. keyword: 要搜索的关键词
  42. current_start: 当前起始结果位置
  43. Returns:
  44. 包含所有搜索结果链接的列表
  45. """
  46. return await self.search_keyword(keyword, current_start + 10)
  47. def save_page(self, keyword: str, start: int) -> Path:
  48. """保存当前页面"""
  49. keyword = replace_space(keyword)
  50. save_dir = OUTPUT_DIR / keyword
  51. ensure_output_dir(save_dir)
  52. save_path = save_dir / f"{start}.html"
  53. save_to_file(self.page.html, save_path)
  54. # 保存到数据库
  55. return save_path
  56. async def _process_page(self, url: str) -> CrawlResult:
  57. """处理页面内容"""
  58. async with AsyncWebCrawler(config=browser_config) as crawler:
  59. return await crawler.arun(
  60. url=url,
  61. cache_mode=CacheMode.ENABLED,
  62. user_agent='random'
  63. )
  64. def is_search_result_empty(self, html_content: str) -> bool:
  65. """检查搜索结果是否为空"""
  66. tree = html.fromstring(html_content)
  67. search_elements = tree.xpath('//*[@id="search"]/*')
  68. return len(search_elements) == 0
  69. def go_to_next_page(self) -> bool:
  70. """跳转到下一页"""
  71. next_button = self.page.ele('#pnnext', timeout=1)
  72. if not next_button:
  73. return False
  74. next_button.click()
  75. return True
  76. def go_to_prev_page(self) -> bool:
  77. """跳转到上一页"""
  78. prev_button = self.page.ele('#pnprev', timeout=1)
  79. if not prev_button:
  80. return False
  81. prev_button.click()
  82. return True
  83. def extract_search_results(self, html_content: str) -> list[str]:
  84. """从搜索结果页面提取所有链接
  85. Args:
  86. html_content: 页面HTML内容
  87. Returns:
  88. 包含所有搜索结果链接的列表
  89. """
  90. tree = html.fromstring(html_content)
  91. rso = tree.xpath('//*[@id="search"]//*[@id="rso"]')[0]
  92. links = []
  93. for element in rso.xpath('.//*[@href]'):
  94. href = element.get('href')
  95. if href and not href.startswith('#'):
  96. links.append(href)
  97. return links
  98. async def main():
  99. from mylib.drission_page import load_chrome_from_ini
  100. # 初始化浏览器
  101. page = load_chrome_from_ini()
  102. manager = SearchManager(page)
  103. # 示例用法
  104. keyword = "Acalypha matsudae essential oil"
  105. # 搜索关键词
  106. res = manager.search_keyword(keyword)
  107. print(f"Found results: {res.model_dump_json(indent=4)} ")
  108. if __name__ == "__main__":
  109. asyncio.run(main())