drission_google_search.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import time
  2. import re
  3. import logging
  4. from pathlib import Path
  5. from DrissionPage import ChromiumPage
  6. from pydantic import BaseModel
  7. from scrapling import Adaptor
  8. from mylib.logu import logger
  9. from mylib.base import save_to_file
  10. from config.settings import OUTPUT_DIR
  11. from mylib.drission_page import load_chrome_from_ini
  12. from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
  13. from worker.search_engine.smart_selector import get_search_ele
  14. from DrissionPage.common import Keys
  15. class SearchResultEle(BaseModel):
  16. search_div: bool | None = None
  17. next_page_url: str | None = None
  18. current_page: int | None = None
  19. results: list[SearchResultItem] = []
  20. class GoogleSearchHandlerDrission:
  21. def __init__(self, page: ChromiumPage):
  22. self.page = page
  23. self.db_manager = SearchResultManager()
  24. self.save_dir = OUTPUT_DIR / 'results'
  25. def save_current_page(self, keyword: str, filename: str=time.strftime("%Y%m%d_%H%M%S")) -> Path:
  26. html_dir = self.save_dir / keyword
  27. html_dir.mkdir(parents=True, exist_ok=True)
  28. html_path = save_to_file(self.page.html, html_dir / f"{filename}.html")
  29. logger.info(f"save_to_file {html_path}")
  30. return html_path
  31. def _process_single_page(self, keyword: str) -> SearchResultEle:
  32. content = self.page.html
  33. result_ele = self.get_search_result_ele(content)
  34. if not result_ele.search_div:
  35. logger.warning(f"未找到搜索结果容器,可能遇到验证页面 keyword: {keyword}")
  36. return result_ele
  37. html_path = self.save_current_page(keyword, filename=f"{result_ele.current_page}")
  38. page_result = self.db_manager.save_page_results(
  39. keyword=keyword,
  40. page_number=result_ele.current_page,
  41. results_count=len(result_ele.results) if result_ele.results else 0,
  42. has_next_page=bool(result_ele.next_page_url),
  43. html_path=html_path
  44. )
  45. if result_ele.results:
  46. self.db_manager.save_result_items(
  47. keyword=keyword,
  48. page_id=page_result.id,
  49. items=result_ele.results,
  50. html_path=html_path,
  51. )
  52. return result_ele
  53. def check_keyword(self, keyword: str, skip_existing: bool) -> tuple[bool, KeywordTask]:
  54. key_model = self.db_manager.get_keyword_task(keyword)
  55. if skip_existing and key_model.is_completed:
  56. logger.info(f"关键词任务已完成,跳过处理: id={key_model.id} {keyword}")
  57. return True, key_model
  58. if key_model:
  59. self.db_manager.delete_keyword_task(keyword)
  60. key_model = self.db_manager.create_keyword_task(keyword)
  61. return False, key_model
  62. def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False):
  63. _, key_model = self.check_keyword(keyword, skip_existing)
  64. self.search(keyword)
  65. search_result_item_count = 0
  66. should_complete = False
  67. while True:
  68. result_ele = self._process_single_page(keyword)
  69. if not result_ele.search_div:
  70. break
  71. search_result_item_count += len(result_ele.results) if result_ele.results else 0
  72. if search_result_item_count >= max_result_items or not result_ele.next_page_url:
  73. should_complete = True
  74. break
  75. try:
  76. self.page.scroll.to_bottom()
  77. time.sleep(3)
  78. next_btn = self.page.ele('#pnnext')
  79. if next_btn:
  80. next_btn.click()
  81. logger.info(f"跳转到下一页: {self.page.url}")
  82. self.page.wait.load_start()
  83. else:
  84. break
  85. except Exception as e:
  86. logger.warning(f"翻页失败: {str(e)}")
  87. break
  88. if should_complete:
  89. key_model = self.db_manager.mark_task_completed(keyword)
  90. logger.info(f"正常完成关键词处理: {keyword}")
  91. else:
  92. logger.warning(f"关键词处理被中断: {keyword}")
  93. return key_model
  94. def goto_home_page(self):
  95. url = "https://www.google.com"
  96. if self.page.url != url:
  97. self.page.get(url)
  98. self.page.wait.load_start()
  99. if 'sorry/' in self.page.url:
  100. raise Exception(f"出现人机验证,正在换身份重试。。 {self.page.url}")
  101. def search(self, query: str):
  102. self.goto_home_page()
  103. search_ele_dict = get_search_ele(self.page.html)
  104. if not search_ele_dict:
  105. raise Exception("未找到搜索框")
  106. search_box = self.page.ele(f'xpath:{search_ele_dict['xpath']}')
  107. search_box.input(query)
  108. self.page.actions.type(Keys.ENTER)
  109. self.page.wait.load_start()
  110. def get_current_page_num(self) -> int:
  111. if '/search?q=' in self.page.url:
  112. match = re.search(r'&start=(\d+)', self.page.url)
  113. return int(match.group(1)) // 10 + 1 if match else 1
  114. def get_search_result_ele(self, html_content: str):
  115. res = SearchResultEle(
  116. search_div=None,
  117. next_page_url=None,
  118. current_page=self.get_current_page_num(),
  119. results=[]
  120. )
  121. page = Adaptor(html_content)
  122. body = Adaptor(page.body)
  123. search_div = body.xpath('//div[@id="search"]')
  124. next_page_url = body.xpath_first('//a[@id="pnnext"]/@href')
  125. res.search_div = bool(search_div)
  126. res.next_page_url = f"https://www.google.com{next_page_url}" if next_page_url else None
  127. if not search_div:
  128. return res
  129. result_list = search_div.xpath('//*[@data-snc]')
  130. logger.info(f"当前页结果数量: {len(result_list)}, next_page_url: {next_page_url}")
  131. for result_item in result_list:
  132. if len(result_item.children) < 2:
  133. continue
  134. result = SearchResultItem()
  135. title_ele = result_item.children[0]
  136. if title_ele:
  137. result.url = title_ele.xpath_first('.//a/@href')
  138. result.title = title_ele.xpath_first('.//h3/text()')
  139. content_ele = result_item.children[1]
  140. if content_ele:
  141. content_list = content_ele.xpath('.//span/text()')
  142. result.content = ''.join(content_list) if content_list else None
  143. if any([result.url, result.title, result.content]):
  144. res.results.append(result)
  145. return res
  146. def search_keyword_drission(keyword, max_result_items=200, skip_existing=True):
  147. ret = {'error': 0, 'msg': '', 'data': None}
  148. logger.info(f"keyword {keyword} max_result_items: {max_result_items} skip_existing: {skip_existing}")
  149. search_handler = GoogleSearchHandlerDrission(None)
  150. exist, keyword_model = search_handler.check_keyword(keyword, skip_existing)
  151. if exist and keyword_model.is_completed:
  152. ret['data'] = keyword_model.model_dump()
  153. return ret
  154. page = load_chrome_from_ini(proxy='http://127.0.0.1:1881')
  155. try:
  156. search_handler = GoogleSearchHandlerDrission(page)
  157. kw = search_handler.process_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing)
  158. if not kw:
  159. ret['error'] = 1
  160. html_path = search_handler.save_current_page(keyword, filename=f"warning_{time.strftime('%Y%m%d_%H%M%S')}")
  161. logger.warning(f"关键词任务未完成: {keyword} html_path: {html_path}")
  162. ret['msg'] = f"关键词任务未完成: {keyword}"
  163. ret['data'] = html_path
  164. return ret
  165. ret['data'] = kw.model_dump()
  166. return ret
  167. except Exception as e:
  168. html_path = search_handler.save_current_page(keyword, filename=f"error_{time.strftime('%Y%m%d_%H%M%S')}")
  169. logger.exception(f"失败: {str(e)} html_path: {html_path}")
  170. ret['error'] = 1
  171. ret['msg'] = f"失败: {str(e)}"
  172. ret['data'] = html_path
  173. page.quit()
  174. return ret
  175. def main():
  176. search_keyword_drission("drission", max_result_items=15)
  177. if __name__ == "__main__":
  178. main()