google_search.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import asyncio
  2. import json
  3. import logging
  4. from pathlib import Path
  5. import re
  6. import time
  7. from camoufox import AsyncCamoufox
  8. from pydantic import BaseModel
  9. from scrapling import Adaptor
  10. from worker.search_engine.camoufox_broswer import BrowserConfig, BrowserCore
  11. from playwright.async_api import Browser, Page, async_playwright
  12. from playwright.sync_api import sync_playwright
  13. from mylib.logu import logger
  14. from mylib.base import save_to_file
  15. from config.settings import OUTPUT_DIR
  16. from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask
  17. from worker.search_engine.smart_selector import get_search_ele
  18. async def async_input(prompt):
  19. loop = asyncio.get_event_loop()
  20. return await loop.run_in_executor(None, input, prompt)
  21. class SearchResultEle(BaseModel):
  22. search_div: bool | None = None
  23. next_page_url: str | None = None
  24. current_page: int | None = None
  25. results: list[SearchResultItem] = []
  26. class GoogleSearchHandler():
  27. def __init__(self, page: Page):
  28. self.page = page
  29. self.db_manager = SearchResultManager()
  30. self.save_dir = OUTPUT_DIR / 'results'
  31. async def save_current_page(self, keyword: str, filename: str=time.strftime("%Y%m%d_%H%M%S")):
  32. html_dir = self.save_dir / keyword
  33. html_dir.mkdir(parents=True, exist_ok=True)
  34. html_path = save_to_file(await self.page.content(), html_dir / f"{filename}.html")
  35. logger.info(f"save_to_file {html_path}")
  36. return html_path
  37. async def _process_single_page(self, keyword: str) -> SearchResultEle:
  38. content = await self.page.content()
  39. result_ele = self.get_search_result_ele(content)
  40. if not result_ele.search_div:
  41. logger.warning(f"未找到搜索结果容器,可能遇到验证页面 keyword: {keyword}")
  42. return result_ele
  43. html_path = await self.save_current_page(keyword, filename=f"{result_ele.current_page}")
  44. page_result = self.db_manager.save_page_results(
  45. keyword=keyword,
  46. page_number=result_ele.current_page,
  47. results_count=len(result_ele.results) if result_ele.results else 0,
  48. has_next_page=bool(result_ele.next_page_url),
  49. html_path=html_path
  50. )
  51. if result_ele.results:
  52. self.db_manager.save_result_items(
  53. keyword=keyword,
  54. page_id=page_result.id,
  55. items=result_ele.results,
  56. html_path=html_path,
  57. )
  58. return result_ele
  59. async def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False):
  60. key_model = self.db_manager.get_keyword_task(keyword)
  61. if skip_existing and key_model:
  62. logger.info(f"关键词任务已完成,跳过处理: {keyword}")
  63. return key_model
  64. # 删除旧数据重新创建任务
  65. if key_model:
  66. self.db_manager.delete_keyword_task(keyword)
  67. key_model = self.db_manager.create_keyword_task(keyword)
  68. await self.search(keyword)
  69. search_result_item_count = 0
  70. should_complete = False # 标记是否满足完成条件
  71. while True:
  72. result_ele = await self._process_single_page(keyword)
  73. # 处理验证页面等异常情况
  74. if not result_ele.search_div:
  75. break
  76. search_result_item_count += len(result_ele.results) if result_ele.results else 0
  77. # 达到最大结果数或没有下一页时标记完成
  78. if search_result_item_count >= max_result_items or not result_ele.next_page_url:
  79. should_complete = True
  80. break
  81. try:
  82. await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
  83. await asyncio.sleep(3)
  84. await self.page.click("//a[@id='pnnext']", timeout=10000)
  85. logger.info(f"self.page.url {self.page.url}")
  86. await self.page.wait_for_load_state('load', timeout=10000)
  87. except Exception as e:
  88. logger.warning(f"翻页失败: {str(e)}")
  89. break
  90. # 只有正常完成时才标记任务完成
  91. if should_complete:
  92. key_model = self.db_manager.mark_task_completed(keyword)
  93. logger.info(f"正常完成关键词处理: {keyword}")
  94. else:
  95. logger.warning(f"关键词处理被中断: {keyword}")
  96. return key_model
  97. async def goto_home_page(self):
  98. url = "https://www.google.com"
  99. if self.page.url != url:
  100. await self.page.goto(url)
  101. await self.page.wait_for_load_state('load', timeout=10000)
  102. if 'sorry/' in self.page.url:
  103. user_input = await async_input("出现人机验证,是否继续?(y/n): ")
  104. if user_input.lower() != 'y':
  105. self.goto_home_page()
  106. else:
  107. raise Exception("用户选择退出,程序终止。")
  108. raise Exception(f"出现人机验证,正在换身份重试。。 {self.page.url}")
  109. def find_search_div(self, html_content: str) -> str:
  110. return bool(Adaptor(html_content).xpath_first('//div[@id="search"]'))
  111. async def search(self, query: str) -> dict:
  112. await self.goto_home_page()
  113. search_ele_dict = get_search_ele(await self.page.content())
  114. if not search_ele_dict:
  115. raise Exception("未找到搜索框")
  116. textarea = self.page.locator(search_ele_dict['xpath'])
  117. await textarea.fill(query, timeout=10000)
  118. await textarea.press('Enter')
  119. await self.page.wait_for_load_state(state='load', timeout=10000)
  120. return await self.page.content()
  121. def get_current_page_num(self) -> int:
  122. if '/search?q=' in self.page.url:
  123. match = re.search(r'&start=(\d+)', self.page.url)
  124. return int(match.group(1)) // 10 + 1 if match else 1
  125. def get_search_result_ele(self, html_content: str):
  126. res = SearchResultEle(
  127. search_div=None,
  128. next_page_url=None,
  129. current_page=self.get_current_page_num(),
  130. results=[]
  131. )
  132. page = Adaptor(html_content)
  133. body = Adaptor(page.body)
  134. search_div = body.xpath('//div[@id="search"]')
  135. next_page_url = body.xpath_first('//a[@id="pnnext"]/@href')
  136. res.search_div = bool(search_div)
  137. res.next_page_url = f"https://www.google.com{next_page_url}" if next_page_url else None
  138. if not search_div:
  139. return res
  140. result_list = search_div.xpath('//*[@data-snc]')
  141. logger.info(f"当前页结果数量: {len(result_list)}, next_page_url: {next_page_url}")
  142. for result_item in result_list:
  143. if len(result_item.children) < 2:
  144. continue
  145. result = SearchResultItem()
  146. title_ele = result_item.children[0]
  147. if title_ele:
  148. result.url = title_ele.xpath_first('.//a/@href')
  149. result.title = title_ele.xpath_first('.//h3/text()')
  150. content_ele = result_item.children[1]
  151. if content_ele:
  152. content_list = content_ele.xpath('.//span/text()')
  153. result.content = ''.join(content_list) if content_list else None
  154. if any([result.url, result.title, result.content]):
  155. res.results.append(result)
  156. return res
  157. async def search_keyword(keyword, max_result_items=200, skip_existing=False, config: BrowserConfig = BrowserConfig()):
  158. ret = {'error': 0, 'msg': '', 'data': None}
  159. config_dict = config.model_dump()
  160. logger.info(f"BrowserConfig {config_dict}")
  161. logger.info(f"keyword {keyword} max_result_items: {max_result_items} skip_existing: {skip_existing}")
  162. async with AsyncCamoufox(**config_dict) as browser:
  163. try:
  164. search_handler = GoogleSearchHandler(await browser.new_page())
  165. kw = await search_handler.process_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing)
  166. if not kw:
  167. ret['error'] = 1
  168. html_path = await search_handler.save_current_page(keyword, filename=f"warning_{time.strftime('%Y%m%d_%H%M%S')}")
  169. logger.warning(f"关键词任务未完成: {keyword} html_path: {html_path}")
  170. ret['msg'] = f"关键词任务未完成: {keyword}"
  171. ret['data'] = html_path
  172. return ret
  173. ret['data'] = kw.model_dump()
  174. return ret
  175. except Exception as e:
  176. html_path = await search_handler.save_current_page(keyword, filename=f"error_{time.strftime('%Y%m%d_%H%M%S')}")
  177. logger.exception(f"失败: {str(e)} html_path: {html_path}")
  178. ret['error'] = 1
  179. ret['msg'] = f"失败: {str(e)}"
  180. ret['data'] = html_path
  181. return ret
  182. async def aio_main(config: BrowserConfig = BrowserConfig()):
  183. try:
  184. async with BrowserCore(config) as core:
  185. search_handler = GoogleSearchHandler(core.page)
  186. await search_handler.goto_home_page()
  187. keywords = [
  188. 'Acampe carinata essential oil',
  189. ]
  190. while True:
  191. await asyncio.sleep(1)
  192. except Exception as e:
  193. logger.error(f"失败: {str(e)}")
  194. def analyze():
  195. html_file = Path(r"K:\code\upwork\zhang_crawl_bio\output\results\Acampe_rigida_essential_oil\page_1.html")
  196. class TestPage:
  197. url = html_file.as_uri()
  198. search_handler = GoogleSearchHandler(page=TestPage())
  199. res = search_handler.get_search_result_ele(html_file.read_text())
  200. logger.info(f"{res.model_dump_json(indent=4,)}")
  201. def main():
  202. asyncio.run(aio_main())
  203. if __name__ == "__main__":
  204. main()