from camoufox import Camoufox from camoufox.server import launch_server from camoufox.async_api import AsyncCamoufox from playwright.async_api import Browser, Page import asyncio import signal import os import datetime from typing import Optional, Dict, Type, Protocol import logging from pydantic import BaseModel from config.settings import OUTPUT_DIR, WORK_DIR # ------------------- Base Interfaces ------------------- class IBrowserCore(Protocol): """浏览器核心操作抽象基类""" async def initialize(self): ... async def close(self): ... async def take_screenshot(self, filename: str) -> str: ... async def get_page_info(self) -> dict: ... class ISearchHandler(Protocol): """搜索操作抽象接口""" async def search(self, query: str) -> dict: ... async def next_page(self) -> dict: ... async def validate_search_result(self) -> bool: ... # ------------------- Core Implementation ------------------- class BrowserConfig(BaseModel): """浏览器基础配置模型""" headless: bool = False geoip: bool = True proxy: Optional[Dict] = {'server': 'http://localhost:1881'} init_url: str = "about:blank" screenshot_dir: str = OUTPUT_DIR / "screenshots" class BrowserCore(IBrowserCore): """浏览器核心功能实现(仅管理浏览器生命周期和基础操作)""" _instance = None _lock = asyncio.Lock() def __init__(self, config: BrowserConfig): self.config = config self.browser: Browser = None self.page: Page = None self.status: str = 'stopped' self.last_activity: datetime.datetime = None @classmethod async def get_instance(cls, config: BrowserConfig = BrowserConfig()) -> "BrowserCore": """获取单例实例""" if not cls._instance: async with cls._lock: if not cls._instance: cls._instance = cls(config) await cls._instance.initialize() return cls._instance async def initialize(self): """初始化浏览器实例""" try: self.browser = await AsyncCamoufox( headless=self.config.headless, geoip=self.config.geoip, proxy=self.config.proxy ).__aenter__() self.page = await self.browser.new_page() await self.page.goto(self.config.init_url) self.status = 'running' self.last_activity = datetime.datetime.now() logging.info(f"Browser session initialized | URL: {self.page.url}") except Exception as e: self.status = 'error' logging.error(f"Browser initialization failed: {str(e)}") raise async def close(self): """关闭浏览器实例""" if self.browser: await self.browser.__aexit__(None, None, None) self.status = 'stopped' logging.info("Browser session closed") async def goto(self, url: str): """导航到指定URL""" await self.page.goto(url) self.last_activity = datetime.datetime.now() async def take_screenshot(self, filename: str) -> str: """截图操作""" os.makedirs(self.config.screenshot_dir, exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") path = os.path.join(self.config.screenshot_dir, f"{timestamp}_{filename}") await self.page.screenshot(path=path, full_page=True) return path async def get_page_info(self) -> dict: pass # ------------------- Search Engine Implementation ------------------- class GoogleSearchHandler(ISearchHandler): """搜索引擎专用处理器(可继承扩展其他引擎)""" def __init__(self, browser_core: BrowserCore): self.core = browser_core self.page = self.core.page async def goto_home_page(self): url = "https://www.google.com" if self.page.url != url: await self.page.goto(url) async def search(self, query: str) -> dict: """执行搜索操作""" try: await self.goto_home_page() # 使用 aria-label 定位 textarea 并填入数据 await self.page.fill('textarea[aria-label="Search"]', query) # 模拟按下回车键 await self.page.press('textarea[aria-label="Search"]', 'Enter') return await self.page.content() except Exception as e: logging.error(f"Search failed: {str(e)}") return {"status": "error", "message": str(e)} async def next_page(self) -> dict: pass # ------------------- API Service ------------------- async def aio_main(config: BrowserConfig = BrowserConfig()): """API服务主循环""" core = await BrowserCore.get_instance(config) search_handler = GoogleSearchHandler(core) try: await search_handler.search('python') logging.info(f"API服务已启动 | 初始页面: {search_handler.page.url}") while core.status == 'running': await asyncio.sleep(5) except KeyboardInterrupt: logging.info("接收到终止信号,关闭浏览器...") except Exception as e: logging.error(f"API服务异常: {str(e)}") finally: await core.close() logging.info("API服务已停止") def main(): asyncio.run(aio_main()) if __name__ == "__main__": config = BrowserConfig( headless=True, init_url="https://www.google.com" ) main()