| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- from camoufox import Camoufox
- from camoufox.server import launch_server
- from camoufox.async_api import AsyncCamoufox
- from playwright.async_api import Browser, Page
- import asyncio
- import signal
- import os
- import datetime
- from typing import Optional, Dict, Type, Protocol
- import logging
- from pydantic import BaseModel
- from config.settings import OUTPUT_DIR, WORK_DIR
- # ------------------- Base Interfaces -------------------
- class IBrowserCore(Protocol):
- """浏览器核心操作抽象基类"""
- async def initialize(self): ...
- async def close(self): ...
- async def take_screenshot(self, filename: str) -> str: ...
- async def get_page_info(self) -> dict: ...
- class ISearchHandler(Protocol):
- """搜索操作抽象接口"""
- async def search(self, query: str) -> dict: ...
- async def next_page(self) -> dict: ...
- async def validate_search_result(self) -> bool: ...
- # ------------------- Core Implementation -------------------
- class BrowserConfig(BaseModel):
- """浏览器基础配置模型"""
- headless: bool = False
- geoip: bool = True
- proxy: Optional[Dict] = {'server': 'http://localhost:1881'}
- init_url: str = "about:blank"
- screenshot_dir: str = OUTPUT_DIR / "screenshots"
- class BrowserCore(IBrowserCore):
- """浏览器核心功能实现(仅管理浏览器生命周期和基础操作)"""
- _instance = None
- _lock = asyncio.Lock()
-
- def __init__(self, config: BrowserConfig):
- self.config = config
- self.browser: Browser = None
- self.page: Page = None
- self.status: str = 'stopped'
- self.last_activity: datetime.datetime = None
- @classmethod
- async def get_instance(cls, config: BrowserConfig = BrowserConfig()) -> "BrowserCore":
- """获取单例实例"""
- if not cls._instance:
- async with cls._lock:
- if not cls._instance:
- cls._instance = cls(config)
- await cls._instance.initialize()
- return cls._instance
- async def initialize(self):
- """初始化浏览器实例"""
- try:
- self.browser = await AsyncCamoufox(
- headless=self.config.headless,
- geoip=self.config.geoip,
- proxy=self.config.proxy
- ).__aenter__()
- self.page = await self.browser.new_page()
- await self.page.goto(self.config.init_url)
- self.status = 'running'
- self.last_activity = datetime.datetime.now()
- logging.info(f"Browser session initialized | URL: {self.page.url}")
- except Exception as e:
- self.status = 'error'
- logging.error(f"Browser initialization failed: {str(e)}")
- raise
- async def close(self):
- """关闭浏览器实例"""
- if self.browser:
- await self.browser.__aexit__(None, None, None)
- self.status = 'stopped'
- logging.info("Browser session closed")
- async def goto(self, url: str):
- """导航到指定URL"""
- await self.page.goto(url)
- self.last_activity = datetime.datetime.now()
- async def take_screenshot(self, filename: str) -> str:
- """截图操作"""
- os.makedirs(self.config.screenshot_dir, exist_ok=True)
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- path = os.path.join(self.config.screenshot_dir, f"{timestamp}_{filename}")
- await self.page.screenshot(path=path, full_page=True)
- return path
- async def get_page_info(self) -> dict:
- pass
- # ------------------- Search Engine Implementation -------------------
- class GoogleSearchHandler(ISearchHandler):
- """搜索引擎专用处理器(可继承扩展其他引擎)"""
- def __init__(self, browser_core: BrowserCore):
- self.core = browser_core
- self.page = self.core.page
- async def goto_home_page(self):
- url = "https://www.google.com"
- if self.page.url != url:
- await self.page.goto(url)
- async def search(self, query: str) -> dict:
- """执行搜索操作"""
- try:
- await self.goto_home_page()
- # 使用 aria-label 定位 textarea 并填入数据
- await self.page.fill('textarea[aria-label="Search"]', query)
- # 模拟按下回车键
- await self.page.press('textarea[aria-label="Search"]', 'Enter')
- return await self.page.content()
- except Exception as e:
- logging.error(f"Search failed: {str(e)}")
- return {"status": "error", "message": str(e)}
- async def next_page(self) -> dict:
- pass
- # ------------------- API Service -------------------
- async def aio_main(config: BrowserConfig = BrowserConfig()):
- """API服务主循环"""
- core = await BrowserCore.get_instance(config)
- search_handler = GoogleSearchHandler(core)
-
- try:
- await search_handler.search('python')
- logging.info(f"API服务已启动 | 初始页面: {search_handler.page.url}")
- while core.status == 'running':
- await asyncio.sleep(5)
-
- except KeyboardInterrupt:
- logging.info("接收到终止信号,关闭浏览器...")
- except Exception as e:
- logging.error(f"API服务异常: {str(e)}")
- finally:
- await core.close()
- logging.info("API服务已停止")
- def main():
- asyncio.run(aio_main())
- if __name__ == "__main__":
- config = BrowserConfig(
- headless=True,
- init_url="https://www.google.com"
- )
- main()
|