drission_google_search.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import time
  2. import re
  3. import logging
  4. from pathlib import Path
  5. from typing import Dict, Optional
  6. from DrissionPage import ChromiumPage
  7. from pydantic import BaseModel
  8. from scrapling import Adaptor
  9. from mylib.logu import logger
  10. from mylib.base import save_to_file
  11. from config.settings import GOOGLE_SEARCH_DIR
  12. from mylib.drission_page import load_chrome_from_ini
  13. from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
  14. from worker.search_engine.smart_selector import get_search_ele
  15. from DrissionPage.common import Keys
  16. class SearchResultEle(BaseModel):
  17. search_div: bool | None = None
  18. next_page_url: str | None = None
  19. current_page: int | None = None
  20. results: list[SearchResultItem] = []
  21. class GoogleSearchHandlerDrission:
  22. def __init__(self, page: ChromiumPage):
  23. self.page = page
  24. self.db_manager = SearchResultManager()
  25. self.save_dir = GOOGLE_SEARCH_DIR
  26. def save_current_page(self, keyword: str, filename: str=time.strftime("%Y%m%d_%H%M%S")) -> Path:
  27. html_dir = self.save_dir / keyword
  28. html_dir.mkdir(parents=True, exist_ok=True)
  29. html_path = save_to_file(self.page.html, html_dir / f"{filename}.html")
  30. logger.info(f"save_to_file {html_path}")
  31. return html_path
  32. def _process_single_page(self, keyword: str) -> SearchResultEle:
  33. content = self.page.html
  34. result_ele = self.get_search_result_ele(content)
  35. if not result_ele.search_div:
  36. logger.warning(f"未找到搜索结果容器,可能遇到验证页面 keyword: {keyword}")
  37. return result_ele
  38. html_path = self.save_current_page(keyword, filename=f"{result_ele.current_page}")
  39. page_result = self.db_manager.save_page_results(
  40. keyword=keyword,
  41. page_number=result_ele.current_page,
  42. results_count=len(result_ele.results) if result_ele.results else 0,
  43. has_next_page=bool(result_ele.next_page_url),
  44. html_path=html_path
  45. )
  46. if result_ele.results:
  47. self.db_manager.save_result_items(
  48. keyword=keyword,
  49. page_id=page_result.id,
  50. items=result_ele.results,
  51. html_path=None, # 当前函数的 html_path 是 page 的 html_path,不是 results items 的 html_path,每个 items 有各自的 url 链接,对应各自的 html_path
  52. )
  53. return result_ele
  54. def check_keyword(self, keyword: str, skip_existing: bool) -> tuple[bool, KeywordTask]:
  55. key_model = self.db_manager.get_keyword_task(keyword)
  56. if skip_existing and key_model and key_model.is_completed:
  57. logger.info(f"关键词任务已完成,跳过处理: id={key_model.id} {keyword}")
  58. return True, key_model
  59. if key_model:
  60. self.db_manager.delete_keyword_task(keyword)
  61. key_model = self.db_manager.create_keyword_task(keyword)
  62. return False, key_model
  63. def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False):
  64. _, key_model = self.check_keyword(keyword, skip_existing)
  65. self.search(keyword)
  66. search_result_item_count = 0
  67. should_complete = False
  68. while True:
  69. result_ele = self._process_single_page(keyword)
  70. if not result_ele.search_div:
  71. break
  72. search_result_item_count += len(result_ele.results) if result_ele.results else 0
  73. if search_result_item_count >= max_result_items or not result_ele.next_page_url:
  74. should_complete = True
  75. break
  76. try:
  77. self.page.scroll.to_bottom()
  78. time.sleep(3)
  79. next_btn = self.page.ele('#pnnext')
  80. if next_btn:
  81. next_btn.click()
  82. logger.info(f"跳转到下一页: {self.page.url}")
  83. self.page.wait.load_start()
  84. else:
  85. break
  86. except Exception as e:
  87. logger.warning(f"翻页失败: {str(e)}")
  88. break
  89. if should_complete:
  90. key_model = self.db_manager.mark_task_completed(keyword)
  91. logger.info(f"正常完成关键词处理: {keyword}")
  92. else:
  93. logger.warning(f"关键词处理被中断: {keyword}")
  94. return key_model
  95. def goto_home_page(self):
  96. url = "https://www.google.com"
  97. logger.info(f"{self.page.url}")
  98. if self.page.url != url:
  99. self.page.get(url,timeout=1)
  100. logger.info(f"{self.page.url}")
  101. if 'sorry/' in self.page.url:
  102. raise Exception(f"出现人机验证,正在换身份重试。。 {self.page.url}")
  103. def search(self, query: str):
  104. self.goto_home_page()
  105. search_ele_dict = get_search_ele(self.page.html)
  106. logger.info(f"smart xpath: {search_ele_dict}")
  107. if not search_ele_dict:
  108. raise Exception("未找到搜索框")
  109. search_box = self.page.ele(f'xpath:{search_ele_dict['xpath']}')
  110. search_box.input(query)
  111. self.page.actions.type(Keys.ENTER)
  112. self.page.wait.load_start()
  113. def get_current_page_num(self) -> int:
  114. if '/search?q=' in self.page.url:
  115. match = re.search(r'&start=(\d+)', self.page.url)
  116. return int(match.group(1)) // 10 + 1 if match else 1
  117. def get_search_result_ele(self, html_content: str):
  118. res = SearchResultEle(
  119. search_div=None,
  120. next_page_url=None,
  121. current_page=self.get_current_page_num(),
  122. results=[]
  123. )
  124. page = Adaptor(html_content)
  125. body = Adaptor(page.body)
  126. search_div = body.xpath('//div[@id="search"]')
  127. next_page_url = body.xpath_first('//a[@id="pnnext"]/@href')
  128. res.search_div = bool(search_div)
  129. res.next_page_url = f"https://www.google.com{next_page_url}" if next_page_url else None
  130. if not search_div:
  131. return res
  132. result_list = search_div.xpath('//*[@data-snc]')
  133. logger.info(f"当前页结果数量: {len(result_list)}, next_page_url: {next_page_url}")
  134. for result_item in result_list:
  135. if len(result_item.children) < 2:
  136. continue
  137. result = SearchResultItem()
  138. title_ele = result_item.children[0]
  139. if title_ele:
  140. result.url = title_ele.xpath_first('.//a/@href')
  141. result.title = title_ele.xpath_first('.//h3/text()')
  142. content_ele = result_item.children[1]
  143. if content_ele:
  144. content_list = content_ele.xpath('.//span/text()')
  145. result.content = ''.join(content_list) if content_list else None
  146. if any([result.url, result.title, result.content]):
  147. res.results.append(result)
  148. return res
  149. def search_keyword_drission(keyword: str, max_result_items: int = 200, skip_existing: bool = True, browser_config: Optional[Dict] = None):
  150. ret = {'error': 0, 'msg': '', 'data': None}
  151. logger.info(f"keyword {keyword} max_result_items: {max_result_items} skip_existing: {skip_existing}")
  152. search_handler = GoogleSearchHandlerDrission(None)
  153. exist, keyword_model = search_handler.check_keyword(keyword, skip_existing)
  154. if exist and keyword_model.is_completed:
  155. ret['data'] = keyword_model.model_dump()
  156. return ret
  157. try:
  158. page = load_chrome_from_ini(**browser_config) if browser_config else load_chrome_from_ini()
  159. logger.info(f"proxy {page._browser._chromium_options.proxy}")
  160. search_handler = GoogleSearchHandlerDrission(page)
  161. kw = search_handler.process_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing)
  162. if not kw:
  163. ret['error'] = 1
  164. html_path = search_handler.save_current_page(keyword, filename=f"warning_{time.strftime('%Y%m%d_%H%M%S')}")
  165. logger.warning(f"关键词任务未完成: {keyword} html_path: {html_path}")
  166. ret['msg'] = f"关键词任务未完成: {keyword}"
  167. ret['data'] = html_path
  168. else:
  169. ret['data'] = kw.model_dump()
  170. page.quit()
  171. return ret
  172. except Exception as e:
  173. html_path = search_handler.save_current_page(keyword, filename=f"error_{time.strftime('%Y%m%d_%H%M%S')}")
  174. logger.exception(f"失败: {str(e)} html_path: {html_path}")
  175. ret['error'] = 1
  176. ret['msg'] = f"失败: {str(e)}"
  177. ret['data'] = html_path
  178. return ret
  179. finally:
  180. page.quit()
  181. def main():
  182. browser_config = {
  183. 'proxy': 'http://127.0.0.1:9360',
  184. 'headless': False
  185. }
  186. res = search_keyword_drission("Acalypha nusbaumeri essential oil", max_result_items=200, skip_existing = False,browser_config=browser_config)
  187. logger.info(f"{res}")
  188. if __name__ == "__main__":
  189. main()