search_manager.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. from pathlib import Path
  2. import asyncio
  3. from DrissionPage import ChromiumPage
  4. from crawl4ai import AsyncWebCrawler, CacheMode, CrawlResult
  5. from mylib.base import (
  6. save_to_file,
  7. ensure_output_dir,
  8. replace_space,
  9. browser_config
  10. )
  11. from database.search_model import SearchDatabaseManager,SearchResult
  12. from database.excel_import import ExcelDatabaseManager,KeywordModel
  13. from database.sqlite_engine import create_db_and_tables, drop_table
  14. from mylib.drission_page import load_chrome_from_ini
  15. from lxml import html
  16. from mylib.settings import GOOGLE_SEARCH_DIR
  17. page = load_chrome_from_ini()
  18. class SearchManager:
  19. def __init__(self, page: ChromiumPage):
  20. self.page = page
  21. self.search_db_manager = SearchDatabaseManager()
  22. self.excel_db_manager = ExcelDatabaseManager()
  23. def search_keyword(self, keyword: str, start: int = 0, cache: bool = True) -> SearchResult:
  24. """搜索关键词并返回结果链接和保存的HTML文件路径
  25. Args:
  26. keyword: 要搜索的关键词
  27. start: 起始结果位置
  28. cache: 是否使用缓存
  29. Returns:
  30. 包含搜索结果链接的列表和保存的HTML文件路径
  31. """
  32. # 检查缓存
  33. if cache:
  34. existing_result = self.search_db_manager.get_existing_result(keyword, start)
  35. print(f"Using existing result for {keyword} {start}")
  36. if existing_result:
  37. return existing_result
  38. # 执行搜索
  39. url = f"https://www.google.com/search?q={keyword}&start={start}"
  40. self.page.get(url)
  41. # 保存HTML文件
  42. html_path = self.save_page(keyword, start)
  43. # 检查是否是最后一页
  44. is_last_page = self.check_last_page()
  45. # 保存到数据库
  46. return self.save_search_result(keyword, start, url, html_path, is_last_page)
  47. def check_last_page(self):
  48. tree = html.fromstring(self.page.html)
  49. # 检查 id="search" 是否存在
  50. search_element = tree.xpath('//*[@id="search"]')
  51. # 检查 id="pnnext" 是否存在
  52. pnnext_element = tree.xpath('//*[@id="pnnext"]')
  53. # 如果 id="search" 存在,且 id="pnnext" 不存在,则说明是最后一页
  54. if pnnext_element:
  55. return False
  56. elif search_element and not pnnext_element:
  57. return True # 是最后一页
  58. else:
  59. raise ValueError("网页错误,无法确定是否是最后一页。")
  60. def walk_search_one_keywords(self, keyword_model: KeywordModel, start: int = 0, pages_num: int = 250, cache=True, skip_exist=True):
  61. keyword= keyword_model.key_word
  62. for current_start in range(start, pages_num, 10):
  63. search_result:SearchResult = self.search_keyword(keyword, current_start, cache)
  64. if search_result.is_last_page:
  65. print(f"Reached last page for {keyword} at start={current_start}")
  66. self.excel_db_manager.mark_keyword_done(keyword)
  67. break
  68. def save_search_result(self, keyword: str, start: int, url: str, html_path: str, is_last_page: bool = False) -> SearchResult:
  69. """保存搜索结果到数据库
  70. Args:
  71. keyword: 搜索关键词
  72. start: 起始位置
  73. url: 搜索URL
  74. html_path: 保存的HTML文件路径
  75. is_last_page: 是否是最后一页
  76. Returns:
  77. 数据库中的SearchResult记录
  78. """
  79. return self.search_db_manager.save_search_result(
  80. keyword=keyword,
  81. start=start,
  82. url=url,
  83. html_path=str(html_path),
  84. is_last_page=is_last_page
  85. )
  86. async def next_page(self, keyword: str, current_start: int, cache: bool = True) -> list[str]:
  87. """翻到下一页并返回结果链接
  88. Args:
  89. keyword: 要搜索的关键词
  90. current_start: 当前起始结果位置
  91. cache: 是否使用缓存
  92. Returns:
  93. 包含所有搜索结果链接的列表
  94. """
  95. # 检查是否是最后一页
  96. existing = self.search_db_manager.get_existing_result(keyword, current_start)
  97. if existing and existing.is_last_page:
  98. print(f"Reached last page for {keyword} at start={current_start}")
  99. return []
  100. return await self.search_keyword(keyword, current_start + 10, cache)
  101. def save_page(self, keyword: str, start: int) -> Path:
  102. """保存当前页面"""
  103. save_dir = GOOGLE_SEARCH_DIR / keyword
  104. ensure_output_dir(save_dir)
  105. save_path = save_dir / f"{start}.html"
  106. save_to_file(self.page.html, save_path)
  107. return save_path
  108. async def _process_page(self, url: str) -> CrawlResult:
  109. """处理页面内容"""
  110. async with AsyncWebCrawler(config=browser_config) as crawler:
  111. return await crawler.arun(
  112. url=url,
  113. cache_mode=CacheMode.ENABLED,
  114. user_agent='random'
  115. )
  116. def is_search_result_empty(self, html_content: str) -> bool:
  117. """检查搜索结果是否为空"""
  118. tree = html.fromstring(html_content)
  119. search_elements = tree.xpath('//*[@id="search"]/*')
  120. return len(search_elements) == 0
  121. def go_to_next_page(self) -> bool:
  122. """跳转到下一页"""
  123. next_button = self.page.ele('#pnnext', timeout=1)
  124. if not next_button:
  125. return False
  126. next_button.click()
  127. return True
  128. def go_to_prev_page(self) -> bool:
  129. """跳转到上一页"""
  130. prev_button = self.page.ele('#pnprev', timeout=1)
  131. if not prev_button:
  132. return False
  133. prev_button.click()
  134. return True
  135. def extract_search_results(self, html_content: str) -> list[str]:
  136. """从搜索结果页面提取所有链接
  137. Args:
  138. html_content: 页面HTML内容
  139. Returns:
  140. 包含所有搜索结果链接的列表
  141. """
  142. tree = html.fromstring(html_content)
  143. rso = tree.xpath('//*[@id="search"]//*[@id="rso"]')[0]
  144. links = []
  145. for element in rso.xpath('.//*[@href]'):
  146. href = element.get('href')
  147. if href and not href.startswith('#'):
  148. links.append(href)
  149. return links
  150. def test_one():
  151. global page
  152. # 初始化浏览器
  153. self = SearchManager(page)
  154. key_model_list = self.excel_db_manager.get_keywords_by_status()
  155. key_model = key_model_list.pop(0)
  156. print(key_model)
  157. self.walk_search_one_keywords(key_model)
  158. async def main():
  159. create_db_and_tables()
  160. # test_one()
  161. # global page
  162. # self = SearchManager(page)
  163. # self.search_db_manager.get_search_results("python", 0)
  164. # 搜索关键词
  165. # res = manager.search_keyword(keyword_model.key_word, cache=True)
  166. # print(f"Found results: {res.model_dump_json(indent=4)} ")
  167. if __name__ == "__main__":
  168. asyncio.run(main())