drission_google_search.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. from pathlib import Path
  2. import time
  3. import re
  4. from typing import Optional
  5. from DrissionPage import ChromiumPage
  6. from DrissionPage.common import Keys
  7. from pydantic import BaseModel
  8. from scrapling import Adaptor
  9. from mylib.logu import logger
  10. from mylib.base import save_to_file
  11. from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
  12. from config.settings import OUTPUT_DIR
  13. from mylib.drission_page import load_chrome_from_ini
  14. class SearchResultEle(BaseModel):
  15. search_div: bool | None = None
  16. next_page_url: str | None = None
  17. current_page: int | None = None
  18. results: list[SearchResultItem] = []
  19. class GoogleSearchHandlerDrission:
  20. def __init__(self, page: ChromiumPage):
  21. self.page = page
  22. self.db_manager = SearchResultManager()
  23. self.save_dir = OUTPUT_DIR / 'results'
  24. def save_current_page(self, keyword: str, filename: str = time.strftime("%Y%m%d_%H%M%S")) -> Path:
  25. html_dir = self.save_dir / keyword
  26. html_dir.mkdir(parents=True, exist_ok=True)
  27. html_path = save_to_file(self.page.html, html_dir / f"{filename}.html")
  28. logger.info(f"save_to_file {html_path}")
  29. return html_path
  30. def _process_single_page(self, keyword: str) -> SearchResultEle:
  31. content = self.page.html
  32. result_ele = self.get_search_result_ele(content)
  33. if not result_ele.search_div:
  34. logger.warning(f"未找到搜索结果容器,可能遇到验证页面 keyword: {keyword}")
  35. return result_ele
  36. html_path = self.save_current_page(keyword, filename=f"{result_ele.current_page}")
  37. page_result = self.db_manager.save_page_results(
  38. keyword=keyword,
  39. page_number=result_ele.current_page,
  40. results_count=len(result_ele.results) if result_ele.results else 0,
  41. has_next_page=bool(result_ele.next_page_url),
  42. html_path=html_path
  43. )
  44. if result_ele.results:
  45. self.db_manager.save_result_items(
  46. keyword=keyword,
  47. page_id=page_result.id,
  48. items=result_ele.results,
  49. html_path=html_path,
  50. )
  51. return result_ele
  52. def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False) -> KeywordTask:
  53. if skip_existing:
  54. key_model = self.db_manager.get_keyword_task(keyword)
  55. if key_model:
  56. logger.info(f"关键词任务已完成,跳过处理: {keyword}")
  57. return key_model
  58. self.db_manager.create_keyword_task(keyword)
  59. self.search(keyword)
  60. has_next = True
  61. search_result_item_count = 0
  62. finitsh_flag = False
  63. while has_next:
  64. result_ele = self._process_single_page(keyword)
  65. search_result_item_count += len(result_ele.results) if result_ele.results else 0
  66. if search_result_item_count > max_result_items:
  67. logger.info(f"关键词 {keyword} 单页结果数量超过 {max_result_items} ,跳过处理下一页")
  68. finitsh_flag = True
  69. break
  70. if result_ele.next_page_url:
  71. self.page.scroll.to_bottom()
  72. time.sleep(3)
  73. next_btn = self.page.ele('#pnnext')
  74. if next_btn:
  75. next_btn.click()
  76. logger.info(f"跳转到下一页: {self.page.url}")
  77. else:
  78. finitsh_flag = True
  79. break
  80. else:
  81. break
  82. return key_model
  83. def goto_home_page(self):
  84. url = "https://www.google.com"
  85. if self.page.url != url:
  86. self.page.get(url)
  87. if 'sorry/' in self.page.url:
  88. raise Exception("出现人机验证,需要人工干预")
  89. def search(self, query: str):
  90. self.goto_home_page()
  91. search_box = self.page.ele('textarea')
  92. search_box.input(query)
  93. self.page.actions.type(Keys.ENTER)
  94. def get_current_page_num(self) -> int:
  95. if '/search?q=' in self.page.url:
  96. match = re.search(r'&start=(\d+)', self.page.url)
  97. return int(match.group(1)) // 10 + 1 if match else 1
  98. def get_search_result_ele(self, html_content: str) -> SearchResultEle:
  99. res = SearchResultEle(
  100. search_div=None,
  101. next_page_url=None,
  102. current_page=self.get_current_page_num(),
  103. results=[]
  104. )
  105. page = Adaptor(html_content)
  106. body = Adaptor(page.body)
  107. search_div = body.xpath('//div[@id="search"]')
  108. next_page_url = body.xpath_first('//a[@id="pnnext"]/@href')
  109. res.search_div = bool(search_div)
  110. res.next_page_url = f"https://www.google.com{next_page_url}" if next_page_url else None
  111. if not search_div:
  112. return res
  113. result_list = search_div.xpath('//*[@data-snc]')
  114. logger.info(f"当前页结果数量: {len(result_list)}")
  115. for result_item in result_list:
  116. if len(result_item.children) < 2:
  117. continue
  118. result = SearchResultItem()
  119. title_ele = result_item.children[0]
  120. if title_ele:
  121. result.url = title_ele.xpath_first('.//a/@href')
  122. result.title = title_ele.xpath_first('.//h3/text()')
  123. content_ele = result_item.children[1]
  124. if content_ele:
  125. content_list = content_ele.xpath('.//span/text()')
  126. result.content = ''.join(content_list) if content_list else None
  127. if any([result.url, result.title, result.content]):
  128. res.results.append(result)
  129. return res
  130. def search_keyword_drission(keyword: str, max_result_items: int = 1, skip_existing: bool = False):
  131. # page = load_chrome_from_ini(proxy='http://localhost:1881')
  132. page = load_chrome_from_ini(proxy='http://localhost:1881')
  133. try:
  134. handler = GoogleSearchHandlerDrission(page)
  135. return handler.process_keyword(keyword, max_result_items, skip_existing)
  136. except Exception as e:
  137. logger.exception(f"关键词处理失败: {keyword}")
  138. raise
  139. def main():
  140. search_keyword_drission("drission")
  141. if __name__ == "__main__":
  142. main()