search_manager.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. from pathlib import Path
  2. import asyncio
  3. from DrissionPage import ChromiumPage
  4. class PageAccessError(Exception):
  5. """自定义页面访问错误异常"""
  6. def __init__(self, message, html_path=None, screenshot_path=None):
  7. super().__init__(message)
  8. self.html_path = html_path
  9. self.screenshot_path = screenshot_path
  10. from crawl4ai import AsyncWebCrawler, CacheMode, CrawlResult
  11. from mylib.base import (
  12. save_to_file,
  13. ensure_output_dir,
  14. replace_space,
  15. browser_config
  16. )
  17. from database.search_model import SearchDatabaseManager,SearchResult
  18. from database.excel_import import ExcelDatabaseManager,KeywordModel
  19. from database.sqlite_engine import create_db_and_tables, drop_table
  20. from mylib.drission_page import load_chrome_from_ini,load_random_ua_chrome
  21. from lxml import html
  22. from config.settings import GOOGLE_SEARCH_DIR
  23. class SearchManager:
  24. def __init__(self, page: ChromiumPage):
  25. self.page = page
  26. self.tab = page.latest_tab
  27. self.search_db_manager = SearchDatabaseManager()
  28. self.excel_db_manager = ExcelDatabaseManager()
  29. def search_keyword(self, keyword: str, start: int = 0, cache: bool = True) -> SearchResult:
  30. """搜索关键词并返回结果链接和保存的HTML文件路径
  31. Args:
  32. keyword: 要搜索的关键词
  33. start: 起始结果位置
  34. cache: 是否使用缓存
  35. Returns:
  36. 包含搜索结果链接的列表和保存的HTML文件路径
  37. """
  38. # 检查缓存
  39. if cache:
  40. existing_result = self.search_db_manager.get_existing_result(keyword, start)
  41. print(f"Using existing result for {keyword} {start}")
  42. if existing_result:
  43. return existing_result
  44. url = f"https://www.google.com/search?q={keyword}"
  45. if start == 0:
  46. # 执行搜索
  47. self.tab.get(url)
  48. else:
  49. self.go_to_next_page()
  50. # 保存HTML文件
  51. html_path = self.save_page(keyword, start)
  52. # 检查是否是最后一页
  53. is_last_page = self.check_last_page()
  54. # 保存到数据库
  55. return self.save_search_result(keyword, start, url, html_path, is_last_page)
  56. def check_last_page(self):
  57. tree = html.fromstring(self.tab.html)
  58. # 检查 id="search" 是否存在
  59. search_element = tree.xpath('//*[@id="search"]')
  60. # 检查 id="pnnext" 是否存在
  61. pnnext_element = tree.xpath('//*[@id="pnnext"]')
  62. # 如果 id="search" 存在,且 id="pnnext" 不存在,则说明是最后一页
  63. if pnnext_element:
  64. return False
  65. elif search_element and not pnnext_element:
  66. return True # 是最后一页
  67. else:
  68. raise PageAccessError("网页错误,无法确定是否是最后一页")
  69. def walk_search_one_keywords(self, keyword_model: KeywordModel, start: int = 0, pages_num: int = 250, cache=True, skip_exist=True):
  70. keyword= keyword_model.key_word
  71. for current_start in range(start, pages_num, 10):
  72. search_result:SearchResult = self.search_keyword(keyword, current_start, cache)
  73. if search_result.is_last_page:
  74. print(f"Reached last page for {keyword} at start={current_start}")
  75. self.excel_db_manager.mark_keyword_done(keyword)
  76. break
  77. else:
  78. self.go_to_next_page()
  79. def save_search_result(self, keyword: str, start: int, url: str, html_path: str, is_last_page: bool = False) -> SearchResult:
  80. """保存搜索结果到数据库
  81. Args:
  82. keyword: 搜索关键词
  83. start: 起始位置
  84. url: 搜索URL
  85. html_path: 保存的HTML文件路径
  86. is_last_page: 是否是最后一页
  87. Returns:
  88. 数据库中的SearchResult记录
  89. """
  90. return self.search_db_manager.save_search_result(
  91. keyword=keyword,
  92. start=start,
  93. url=url,
  94. html_path=str(html_path),
  95. is_last_page=is_last_page
  96. )
  97. async def next_page(self, keyword: str, current_start: int, cache: bool = True) -> list[str]:
  98. """翻到下一页并返回结果链接
  99. Args:
  100. keyword: 要搜索的关键词
  101. current_start: 当前起始结果位置
  102. cache: 是否使用缓存
  103. Returns:
  104. 包含所有搜索结果链接的列表
  105. """
  106. # 检查是否是最后一页
  107. existing = self.search_db_manager.get_existing_result(keyword, current_start)
  108. if existing and existing.is_last_page:
  109. print(f"Reached last page for {keyword} at start={current_start}")
  110. return []
  111. return await self.search_keyword(keyword, current_start + 10, cache)
  112. def save_page(self, keyword: str, start: int) -> Path:
  113. """保存当前页面"""
  114. save_dir = GOOGLE_SEARCH_DIR / keyword
  115. ensure_output_dir(save_dir)
  116. save_path = save_dir / f"{start}.html"
  117. save_to_file(self.tab.html, save_path)
  118. return save_path
  119. async def _process_page(self, url: str) -> CrawlResult:
  120. """处理页面内容"""
  121. async with AsyncWebCrawler(config=browser_config) as crawler:
  122. return await crawler.arun(
  123. url=url,
  124. cache_mode=CacheMode.ENABLED,
  125. user_agent='random'
  126. )
  127. def is_search_result_empty(self, html_content: str) -> bool:
  128. """检查搜索结果是否为空"""
  129. tree = html.fromstring(html_content)
  130. search_elements = tree.xpath('//*[@id="search"]/*')
  131. return len(search_elements) == 0
  132. def go_to_next_page(self) -> bool:
  133. """跳转到下一页"""
  134. next_button = self.tab.ele('#pnnext', timeout=1)
  135. if not next_button:
  136. return False
  137. next_button.click()
  138. return True
  139. def go_to_prev_page(self) -> bool:
  140. """跳转到上一页"""
  141. prev_button = self.tab.ele('#pnprev', timeout=1)
  142. if not prev_button:
  143. return False
  144. prev_button.click()
  145. return True
  146. def extract_search_results(self, html_content: str) -> list[str]:
  147. """从搜索结果页面提取所有链接
  148. Args:
  149. html_content: 页面HTML内容
  150. Returns:
  151. 包含所有搜索结果链接的列表
  152. """
  153. tree = html.fromstring(html_content)
  154. rso = tree.xpath('//*[@id="search"]//*[@id="rso"]')[0]
  155. links = []
  156. for element in rso.xpath('.//*[@href]'):
  157. href = element.get('href')
  158. if href and not href.startswith('#'):
  159. links.append(href)
  160. return links
  161. def restart_browser(self,):
  162. """重启浏览器"""
  163. self.page.quit()
  164. self.page = load_random_ua_chrome()
  165. self.tab = self.page.latest_tab
  166. def test_one():
  167. # 初始化浏览器
  168. self = SearchManager(load_random_ua_chrome())
  169. key_model_list = self.excel_db_manager.get_keywords_by_status()
  170. key_model = key_model_list.pop(0)
  171. print(key_model)
  172. self.walk_search_one_keywords(key_model)
  173. def test_all():
  174. # 初始化浏览器
  175. self = SearchManager(load_random_ua_chrome())
  176. key_model_list = self.excel_db_manager.get_keywords_by_status()
  177. all_count = self.excel_db_manager.get_keywords_count()
  178. print("遍历所有搜索词, len = ", len(key_model_list))
  179. # 遍历所有搜索词
  180. for key_model in key_model_list:
  181. print('---------------------------')
  182. print(f"Processing keyword: {key_model.key_word}")
  183. print(f"总计: len {all_count} / 当前 {key_model.id}")
  184. try:
  185. self.walk_search_one_keywords(key_model)
  186. except PageAccessError as e:
  187. print(f"{str(e)}")
  188. # 保存当前页面HTML和截图
  189. start = 0
  190. error_html_path = self.save_page(key_model.key_word, "error_" + str(start))
  191. screenshot_path = str(error_html_path.with_suffix('.png'))
  192. self.tab.get_screenshot(path=screenshot_path)
  193. print(f"screenshot saved to {screenshot_path}")
  194. self.restart_browser()
  195. async def main():
  196. create_db_and_tables()
  197. test_all()
  198. # test_one()
  199. # global page
  200. # self = SearchManager(page)
  201. # self.search_db_manager.get_search_results("python", 0)
  202. # 搜索关键词
  203. # res = manager.search_keyword(keyword_model.key_word, cache=True)
  204. # print(f"Found results: {res.model_dump_json(indent=4)} ")
  205. if __name__ == "__main__":
  206. asyncio.run(main())