import asyncio import json import logging from pathlib import Path import re import time from camoufox import AsyncCamoufox from pydantic import BaseModel from scrapling import Adaptor from worker.search_engine.camoufox_broswer import BrowserConfig, BrowserCore from playwright.async_api import Browser, Page, async_playwright from playwright.sync_api import sync_playwright from mylib.logu import logger from mylib.base import save_to_file from config.settings import OUTPUT_DIR from worker.search_engine.search_result_db import SearchResultManager, SearchResultItem, KeywordTask from worker.search_engine.smart_selector import get_search_ele async def async_input(prompt): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, input, prompt) class SearchResultEle(BaseModel): search_div: bool | None = None next_page_url: str | None = None current_page: int | None = None results: list[SearchResultItem] = [] class GoogleSearchHandler(): def __init__(self, page: Page): self.page = page self.db_manager = SearchResultManager() self.save_dir = OUTPUT_DIR / 'results' async def save_current_page(self, keyword: str, filename: str=time.strftime("%Y%m%d_%H%M%S")): html_dir = self.save_dir / keyword html_dir.mkdir(parents=True, exist_ok=True) html_path = save_to_file(await self.page.content(), html_dir / f"{filename}.html") logger.info(f"save_to_file {html_path}") return html_path async def _process_single_page(self, keyword: str) -> SearchResultEle: content = await self.page.content() result_ele = self.get_search_result_ele(content) if not result_ele.search_div: logger.warning(f"未找到搜索结果容器,可能遇到验证页面 keyword: {keyword}") return result_ele html_path = await self.save_current_page(keyword, filename=f"{result_ele.current_page}") page_result = self.db_manager.save_page_results( keyword=keyword, page_number=result_ele.current_page, results_count=len(result_ele.results) if result_ele.results else 0, has_next_page=bool(result_ele.next_page_url), html_path=html_path ) if result_ele.results: self.db_manager.save_result_items( keyword=keyword, page_id=page_result.id, items=result_ele.results, html_path=html_path, ) return result_ele async def process_keyword(self, keyword: str, max_result_items: int = 200, skip_existing: bool = False): key_model = self.db_manager.get_keyword_task(keyword) if skip_existing and key_model: logger.info(f"关键词任务已完成,跳过处理: {keyword}") return key_model # 删除旧数据重新创建任务 if key_model: self.db_manager.delete_keyword_task(keyword) key_model = self.db_manager.create_keyword_task(keyword) await self.search(keyword) search_result_item_count = 0 should_complete = False # 标记是否满足完成条件 while True: result_ele = await self._process_single_page(keyword) # 处理验证页面等异常情况 if not result_ele.search_div: break search_result_item_count += len(result_ele.results) if result_ele.results else 0 # 达到最大结果数或没有下一页时标记完成 if search_result_item_count >= max_result_items or not result_ele.next_page_url: should_complete = True break try: await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight);") await asyncio.sleep(3) await self.page.click("//a[@id='pnnext']", timeout=10000) logger.info(f"self.page.url {self.page.url}") await self.page.wait_for_load_state('load', timeout=10000) except Exception as e: logger.warning(f"翻页失败: {str(e)}") break # 只有正常完成时才标记任务完成 if should_complete: key_model = self.db_manager.mark_task_completed(keyword) logger.info(f"正常完成关键词处理: {keyword}") else: logger.warning(f"关键词处理被中断: {keyword}") return key_model async def goto_home_page(self): url = "https://www.google.com" if self.page.url != url: await self.page.goto(url) await self.page.wait_for_load_state('load', timeout=10000) if 'sorry/' in self.page.url: user_input = await async_input("出现人机验证,是否继续?(y/n): ") if user_input.lower() != 'y': self.goto_home_page() else: raise Exception("用户选择退出,程序终止。") raise Exception(f"出现人机验证,正在换身份重试。。 {self.page.url}") def find_search_div(self, html_content: str) -> str: return bool(Adaptor(html_content).xpath_first('//div[@id="search"]')) async def search(self, query: str) -> dict: await self.goto_home_page() search_ele_dict = get_search_ele(await self.page.content()) if not search_ele_dict: raise Exception("未找到搜索框") textarea = self.page.locator(search_ele_dict['xpath']) await textarea.fill(query, timeout=10000) await textarea.press('Enter') await self.page.wait_for_load_state(state='load', timeout=10000) return await self.page.content() def get_current_page_num(self) -> int: if '/search?q=' in self.page.url: match = re.search(r'&start=(\d+)', self.page.url) return int(match.group(1)) // 10 + 1 if match else 1 def get_search_result_ele(self, html_content: str): res = SearchResultEle( search_div=None, next_page_url=None, current_page=self.get_current_page_num(), results=[] ) page = Adaptor(html_content) body = Adaptor(page.body) search_div = body.xpath('//div[@id="search"]') next_page_url = body.xpath_first('//a[@id="pnnext"]/@href') res.search_div = bool(search_div) res.next_page_url = f"https://www.google.com{next_page_url}" if next_page_url else None if not search_div: return res result_list = search_div.xpath('//*[@data-snc]') logger.info(f"当前页结果数量: {len(result_list)}, next_page_url: {next_page_url}") for result_item in result_list: if len(result_item.children) < 2: continue result = SearchResultItem() title_ele = result_item.children[0] if title_ele: result.url = title_ele.xpath_first('.//a/@href') result.title = title_ele.xpath_first('.//h3/text()') content_ele = result_item.children[1] if content_ele: content_list = content_ele.xpath('.//span/text()') result.content = ''.join(content_list) if content_list else None if any([result.url, result.title, result.content]): res.results.append(result) return res async def search_keyword(keyword, max_result_items=200, skip_existing=False, config: BrowserConfig = BrowserConfig()): ret = {'error': 0, 'msg': '', 'data': None} config_dict = config.model_dump() logger.info(f"BrowserConfig {config_dict}") logger.info(f"keyword {keyword} max_result_items: {max_result_items} skip_existing: {skip_existing}") async with AsyncCamoufox(**config_dict) as browser: try: search_handler = GoogleSearchHandler(await browser.new_page()) kw = await search_handler.process_keyword(keyword, max_result_items=max_result_items, skip_existing=skip_existing) if not kw: ret['error'] = 1 html_path = await search_handler.save_current_page(keyword, filename=f"warning_{time.strftime('%Y%m%d_%H%M%S')}") logger.warning(f"关键词任务未完成: {keyword} html_path: {html_path}") ret['msg'] = f"关键词任务未完成: {keyword}" ret['data'] = html_path return ret ret['data'] = kw.model_dump() return ret except Exception as e: html_path = await search_handler.save_current_page(keyword, filename=f"error_{time.strftime('%Y%m%d_%H%M%S')}") logger.exception(f"失败: {str(e)} html_path: {html_path}") ret['error'] = 1 ret['msg'] = f"失败: {str(e)}" ret['data'] = html_path return ret async def aio_main(config: BrowserConfig = BrowserConfig()): try: async with BrowserCore(config) as core: search_handler = GoogleSearchHandler(core.page) await search_handler.goto_home_page() keywords = [ 'Acampe carinata essential oil', ] while True: await asyncio.sleep(1) except Exception as e: logger.error(f"失败: {str(e)}") def analyze(): html_file = Path(r"K:\code\upwork\zhang_crawl_bio\output\results\Acampe_rigida_essential_oil\page_1.html") class TestPage: url = html_file.as_uri() search_handler = GoogleSearchHandler(page=TestPage()) res = search_handler.get_search_result_ele(html_file.read_text()) logger.info(f"{res.model_dump_json(indent=4,)}") def main(): asyncio.run(aio_main()) if __name__ == "__main__": main()