瀏覽代碼

完成 Google search 关键词回车搜索

mrh 10 月之前
父節點
當前提交
a78d48b434
共有 5 個文件被更改,包括 259 次插入123 次删除
  1. 65 6
      .clinerules-code
  2. 66 4
      CONVENTIONS.md
  3. 52 0
      architecture.md
  4. 4 1
      readme.md
  5. 72 112
      worker/search_engine/camoufox_broswer.py

+ 65 - 6
.clinerules-code

@@ -1,7 +1,66 @@
-编程注意事项:
-- 编写模块化函数的时候,需要添加适当的注释。
-- 编写代码遵循模块化,高内聚低耦合原则,符合程序设计的基本原则。
-- 将所有函数定义分解成最小化的任务,不要执行多个步骤在同一个函数里面。分解任务函数。必须只能最小化一个任务一个函数。
+# 编程规范
+为了保持程序的通用性、扩展性和兼容性,代码设计应遵循以下原则:
 
-项目说明:
-- 测试模块在 tests 的目录中, `tests\mytest` 是我私人的草稿,不用理会
+1. 模块化设计:将功能分解为独立的模块或类,每个模块或类应专注于单一职责。避免将多个不相关的业务逻辑集中在一个类或函数中。
+
+2. 高内聚低耦合:确保每个模块或类内部的元素紧密相关(高内聚),同时减少模块或类之间的依赖关系(低耦合)。这样可以使代码更易于维护和扩展。
+
+3. 单一职责原则:每个函数或方法应只执行一个最小化的任务。如果一个函数或方法包含多个步骤,应考虑将其分解为多个更小的函数或方法。
+
+4. 按需创建:根据业务需求决定是否创建新的文件、类或函数。如果现有类或函数无法满足新的需求,应考虑创建新的类或函数,而不是在现有代码中添加额外的逻辑。
+
+5. 继承与组合:合理使用继承和组合来增强代码的复用性和扩展性。优先使用组合而非继承,以避免过度复杂的继承层次。
+
+6. 注释与文档:为每个模块、类和函数添加适当的注释,说明其功能、输入输出以及使用场景。这有助于其他开发者理解和使用你的代码。
+
+7. 接口与抽象:定义清晰的接口和抽象基类,以便在不同实现之间保持兼容性。通过接口或抽象类来定义通用的行为,具体的实现可以在子类中完成。
+
+8. 可扩展性:在设计时考虑未来的扩展需求,避免硬编码和过度依赖特定实现。使用配置文件、依赖注入等方式来提高代码的灵活性。
+
+9. 兼容性:在修改或扩展代码时,确保新代码与旧代码兼容,避免破坏现有功能。可以通过版本控制、接口隔离等方式来管理兼容性问题。
+
+# 项目说明:
+- 测试模块在 tests 的目录中, `tests\mytest` 是我私人的草稿,不用理会
+
+## 轮询架构
+我的场景中,Worker 是多机分布部署的,任务执行时间较长,且可能因错误被终止。为了确保任务的可追踪性和实时性,设计一个能够实时监控任务进度、记录失败点并支持任务恢复的系统是非常重要的。以下是针对你的需求的优化设计方案:
+
+---
+
+### **三、推荐方案**
+结合你的需求(实时性、任务恢复、多机部署),推荐以下方案:
+1. **任务进度存储**:
+   - 使用 **中心化数据库(SQL Server)** 实时存储任务进度,确保数据可靠性和任务可恢复性。
+2. **任务监控**:
+   - 使用 **WebSocket 实时推送** 或 **Redis Pub/Sub** 实现任务进度的实时监控,避免轮询的开销。
+3. **错误处理**:
+   - 在数据库中记录任务失败时的页码和错误信息,支持从失败点继续执行。
+
+---
+
+### **四、示例架构**
+```
++----------------+       +-----------------+       +----------------+
+|   Worker 1     |       |   Worker 2      |       |   Worker N     |
+| (更新进度)      |       | (更新进度)       |       | (更新进度)      |
++-------+--------+       +--------+--------+       +--------+-------+
+        |                         |                         |
+        |                         |                         |
+        |                         |                         |
++-------v-------------------------v-------------------------v-------+
+|                            Redis Pub/Sub                          |
+| (实时广播任务进度)                                                 |
++-----------------------------------+-------------------------------+
+                                    |
+                                    |
++-----------------------------------v-------------------------------+
+|                            Master (WebSocket)                     |
+| (接收进度更新,存储到数据库)                                        |
++-----------------------------------+-------------------------------+
+                                    |
+                                    |
++-----------------------------------v-------------------------------+
+|                            SQL Server                             |
+| (存储任务进度、失败信息)                                            |
++-------------------------------------------------------------------+
+```

+ 66 - 4
CONVENTIONS.md

@@ -1,4 +1,66 @@
-编程注意事项:
-- 编写模块化函数的时候,需要添加适当的注释。
-- 编写代码遵循模块化,高内聚低耦合原则,符合程序设计的基本原则。
-- 将所有函数定义分解成最小化的任务,不要执行多个步骤在同一个函数里面。分解任务函数。必须只能最小化一个任务一个函数。
+# 编程规范
+为了保持程序的通用性、扩展性和兼容性,代码设计应遵循以下原则:
+
+1. 模块化设计:将功能分解为独立的模块或类,每个模块或类应专注于单一职责。避免将多个不相关的业务逻辑集中在一个类或函数中。
+
+2. 高内聚低耦合:确保每个模块或类内部的元素紧密相关(高内聚),同时减少模块或类之间的依赖关系(低耦合)。这样可以使代码更易于维护和扩展。
+
+3. 单一职责原则:每个函数或方法应只执行一个最小化的任务。如果一个函数或方法包含多个步骤,应考虑将其分解为多个更小的函数或方法。
+
+4. 按需创建:根据业务需求决定是否创建新的文件、类或函数。如果现有类或函数无法满足新的需求,应考虑创建新的类或函数,而不是在现有代码中添加额外的逻辑。
+
+5. 继承与组合:合理使用继承和组合来增强代码的复用性和扩展性。优先使用组合而非继承,以避免过度复杂的继承层次。
+
+6. 注释与文档:为每个模块、类和函数添加适当的注释,说明其功能、输入输出以及使用场景。这有助于其他开发者理解和使用你的代码。
+
+7. 接口与抽象:定义清晰的接口和抽象基类,以便在不同实现之间保持兼容性。通过接口或抽象类来定义通用的行为,具体的实现可以在子类中完成。
+
+8. 可扩展性:在设计时考虑未来的扩展需求,避免硬编码和过度依赖特定实现。使用配置文件、依赖注入等方式来提高代码的灵活性。
+
+9. 兼容性:在修改或扩展代码时,确保新代码与旧代码兼容,避免破坏现有功能。可以通过版本控制、接口隔离等方式来管理兼容性问题。
+
+# 项目说明:
+- 测试模块在 tests 的目录中, `tests\mytest` 是我私人的草稿,不用理会
+
+## 轮询架构
+我的场景中,Worker 是多机分布部署的,任务执行时间较长,且可能因错误被终止。为了确保任务的可追踪性和实时性,设计一个能够实时监控任务进度、记录失败点并支持任务恢复的系统是非常重要的。以下是针对你的需求的优化设计方案:
+
+---
+
+### **三、推荐方案**
+结合你的需求(实时性、任务恢复、多机部署),推荐以下方案:
+1. **任务进度存储**:
+   - 使用 **中心化数据库(SQL Server)** 实时存储任务进度,确保数据可靠性和任务可恢复性。
+2. **任务监控**:
+   - 使用 **WebSocket 实时推送** 或 **Redis Pub/Sub** 实现任务进度的实时监控,避免轮询的开销。
+3. **错误处理**:
+   - 在数据库中记录任务失败时的页码和错误信息,支持从失败点继续执行。
+
+---
+
+### **四、示例架构**
+```
++----------------+       +-----------------+       +----------------+
+|   Worker 1     |       |   Worker 2      |       |   Worker N     |
+| (更新进度)      |       | (更新进度)       |       | (更新进度)      |
++-------+--------+       +--------+--------+       +--------+-------+
+        |                         |                         |
+        |                         |                         |
+        |                         |                         |
++-------v-------------------------v-------------------------v-------+
+|                            Redis Pub/Sub                          |
+| (实时广播任务进度)                                                 |
++-----------------------------------+-------------------------------+
+                                    |
+                                    |
++-----------------------------------v-------------------------------+
+|                            Master (WebSocket)                     |
+| (接收进度更新,存储到数据库)                                        |
++-----------------------------------+-------------------------------+
+                                    |
+                                    |
++-----------------------------------v-------------------------------+
+|                            SQL Server                             |
+| (存储任务进度、失败信息)                                            |
++-------------------------------------------------------------------+
+```

+ 52 - 0
architecture.md

@@ -1,3 +1,55 @@
+# 轮询架构
+在你的场景中,Worker 是多机分布部署的,任务执行时间较长,且可能因错误被终止。为了确保任务的可追踪性和实时性,设计一个能够实时监控任务进度、记录失败点并支持任务恢复的系统是非常重要的。以下是针对你的需求的优化设计方案:
+
+---
+
+### **三、推荐方案**
+结合你的需求(实时性、任务恢复、多机部署),推荐以下方案:
+1. **任务进度存储**:
+   - 使用 **中心化数据库(SQL Server)** 实时存储任务进度,确保数据可靠性和任务可恢复性。
+2. **任务监控**:
+   - 使用 **WebSocket 实时推送** 或 **Redis Pub/Sub** 实现任务进度的实时监控,避免轮询的开销。
+3. **错误处理**:
+   - 在数据库中记录任务失败时的页码和错误信息,支持从失败点继续执行。
+
+---
+
+### **四、示例架构**
+```
++----------------+       +-----------------+       +----------------+
+|   Worker 1     |       |   Worker 2      |       |   Worker N     |
+| (更新进度)      |       | (更新进度)       |       | (更新进度)      |
++-------+--------+       +--------+--------+       +--------+-------+
+        |                         |                         |
+        |                         |                         |
+        |                         |                         |
++-------v-------------------------v-------------------------v-------+
+|                            Redis Pub/Sub                          |
+| (实时广播任务进度)                                                 |
++-----------------------------------+-------------------------------+
+                                    |
+                                    |
++-----------------------------------v-------------------------------+
+|                            Master (WebSocket)                     |
+| (接收进度更新,存储到数据库)                                        |
++-----------------------------------+-------------------------------+
+                                    |
+                                    |
++-----------------------------------v-------------------------------+
+|                            SQL Server                             |
+| (存储任务进度、失败信息)                                            |
++-------------------------------------------------------------------+
+```
+
+---
+
+### **五、总结**
+- **任务进度存储**:推荐使用中心化数据库(SQL Server)实时存储任务进度,确保数据可靠性和任务可恢复性。
+- **任务监控**:推荐使用 WebSocket 或 Redis Pub/Sub 实现实时进度推送,避免轮询的开销。
+- **错误处理**:在数据库中记录任务失败时的页码和错误信息,支持从失败点继续执行。
+
+通过以上设计,你的系统将具备高实时性、高可靠性和任务可恢复性,能够有效管理多机分布部署的 Worker 节点。
+
 # 架构
 
 ### **一、Master 节点的核心职责与实现**

+ 4 - 1
readme.md

@@ -1,6 +1,9 @@
 
 
-
+# 测试
+```shell
+python -m pytest tests/test_google_search.py -v
+```
 
 # 工具
 

+ 72 - 112
worker/search_engine/camoufox_broswer.py

@@ -1,74 +1,53 @@
 from camoufox import Camoufox
 from camoufox.server import launch_server
 from camoufox.async_api import AsyncCamoufox
-from playwright.async_api import Browser,Page
+from playwright.async_api import Browser, Page
 import asyncio
 import signal
 import os
 import datetime
-from typing import Optional, Dict
+from typing import Optional, Dict, Type, Protocol
 import logging
 from pydantic import BaseModel
-from typing import Optional, Dict
 from config.settings import OUTPUT_DIR, WORK_DIR
 
+# ------------------- Base Interfaces -------------------
+class IBrowserCore(Protocol):
+    """浏览器核心操作抽象基类"""
+    async def initialize(self): ...
+    async def close(self): ...
+    async def take_screenshot(self, filename: str) -> str: ...
+    async def get_page_info(self) -> dict: ...
+
+class ISearchHandler(Protocol):
+    """搜索操作抽象接口"""
+    async def search(self, query: str) -> dict: ...
+    async def next_page(self) -> dict: ...
+    async def validate_search_result(self) -> bool: ...
+
+# ------------------- Core Implementation -------------------
 class BrowserConfig(BaseModel):
-    """浏览器配置参数模型"""
+    """浏览器基础配置模型"""
     headless: bool = False
     geoip: bool = True
     proxy: Optional[Dict] = {'server': 'http://localhost:1881'}
-    init_url: str = "https://www.browserscan.net"
-    screenshot_dir: str = OUTPUT_DIR / "screenshots" 
+    init_url: str = "about:blank"
+    screenshot_dir: str = OUTPUT_DIR / "screenshots"
 
-class PageOperations:
-    """封装页面交互操作"""
-    def __init__(self, page: Page, config: BrowserConfig):
-        self.page = page
-        self.config = config
-
-    async def search_element(self, selector: str, timeout: float = 30.0):
-        """等待并返回指定元素"""
-        return await self.page.wait_for_selector(selector, timeout=timeout)
-
-    async def click_element(self, selector: str):
-        """点击指定选择器的元素"""
-        element = await self.search_element(selector)
-        await element.click()
-
-    async def take_screenshot(self, filename: str):
-        """带时间戳的截图保存"""
-        os.makedirs(self.config.screenshot_dir, exist_ok=True)
-        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
-        path = os.path.join(self.config.screenshot_dir, f"{timestamp}_{filename}")
-        await self.page.screenshot(path=path, full_page=True)
-        return path
-
-    async def fill_input(self, selector: str, text: str):
-        """在指定输入框填充文本"""
-        element = await self.search_element(selector)
-        await element.fill(text)
-
-    async def press_enter(self):
-        """执行键盘回车操作"""
-        await self.page.keyboard.press("Enter")
-
-class BrowserManager:
-    """全局浏览器会话管理器(单例模式)"""
+class BrowserCore(IBrowserCore):
+    """浏览器核心功能实现(仅管理浏览器生命周期和基础操作)"""
     _instance = None
     _lock = asyncio.Lock()
     
-    def __init__(self, config: BrowserConfig = BrowserConfig()):
-        if not hasattr(self, '_initialized'):
-            self.config = config
-            self.browser: Browser = None
-            self.page: Page = None
-            self.page_ops: PageOperations = None
-            self.status: str = 'stopped'  # running/stopped/error
-            self.last_activity: datetime.datetime = None
-            self._initialized = True
+    def __init__(self, config: BrowserConfig):
+        self.config = config
+        self.browser: Browser = None
+        self.page: Page = None
+        self.status: str = 'stopped'
+        self.last_activity: datetime.datetime = None
 
     @classmethod
-    async def get_instance(cls, config: BrowserConfig = BrowserConfig()) -> "BrowserManager":
+    async def get_instance(cls, config: BrowserConfig = BrowserConfig()) -> "BrowserCore":
         """获取单例实例"""
         if not cls._instance:
             async with cls._lock:
@@ -87,7 +66,6 @@ class BrowserManager:
             ).__aenter__()
             self.page = await self.browser.new_page()
             await self.page.goto(self.config.init_url)
-            self.page_ops = PageOperations(self.page, self.config)
             self.status = 'running'
             self.last_activity = datetime.datetime.now()
             logging.info(f"Browser session initialized | URL: {self.page.url}")
@@ -103,75 +81,59 @@ class BrowserManager:
             self.status = 'stopped'
             logging.info("Browser session closed")
 
-    async def ensure_active_session(self):
-        """确保会话有效性"""
-        if self.status != 'running' or self.page.is_closed():
-            await self.close()
-            await self.initialize()
+    async def goto(self, url: str):
+        """导航到指定URL"""
+        await self.page.goto(url)
+        self.last_activity = datetime.datetime.now()
+
+    async def take_screenshot(self, filename: str) -> str:
+        """截图操作"""
+        os.makedirs(self.config.screenshot_dir, exist_ok=True)
+        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
+        path = os.path.join(self.config.screenshot_dir, f"{timestamp}_{filename}")
+        await self.page.screenshot(path=path, full_page=True)
+        return path
 
-    # API操作方法
-    async def search(self, query: str, engine: str = "google"):
+    async def get_page_info(self) -> dict:
+        pass
+
+# ------------------- Search Engine Implementation -------------------
+class GoogleSearchHandler(ISearchHandler):
+    """搜索引擎专用处理器(可继承扩展其他引擎)"""
+    def __init__(self, browser_core: BrowserCore):
+        self.core = browser_core
+        self.page = self.core.page
+    async def goto_home_page(self):
+        url = "https://www.google.com"
+        if self.page.url != url:
+            await self.page.goto(url)
+    async def search(self, query: str) -> dict:
         """执行搜索操作"""
         try:
-            await self.ensure_active_session()
-            
-            # 导航到搜索引擎
-            search_url = f"https://www.{engine}.com/search?q={query}"
-            await self.page.goto(search_url)
-            
-            # 等待搜索结果加载
-            await self.page.wait_for_selector('#search', timeout=5000)
-            self.last_activity = datetime.datetime.now()
-            
-            # 记录截图
-            screenshot_path = await self.page_ops.take_screenshot(f"search_{query}.png")
-            return {"status": "success", "screenshot": screenshot_path}
-            
+            await self.goto_home_page()
+            # 使用 aria-label 定位 textarea 并填入数据
+            await self.page.fill('textarea[aria-label="Search"]', query)
+            # 模拟按下回车键
+            await self.page.press('textarea[aria-label="Search"]', 'Enter')        
+            return await self.page.content()
         except Exception as e:
-            self.status = 'error'
             logging.error(f"Search failed: {str(e)}")
             return {"status": "error", "message": str(e)}
 
-    async def get_page_info(self):
-        """获取当前页面信息"""
-        try:
-            await self.ensure_active_session()
-            return {
-                "url": self.page.url,
-                "title": await self.page.title(),
-                "content": await self.page.content(),
-                "timestamp": datetime.datetime.now().isoformat()
-            }
-        except Exception as e:
-            logging.error(f"Get page info failed: {str(e)}")
-            return {"status": "error", "message": str(e)}
+    async def next_page(self) -> dict:
+        pass
 
-    async def next_page(self):
-        """跳转到下一页"""
-        try:
-            await self.ensure_active_session()
-            next_btn = await self.page_ops.search_element('a:has-text("Next")')
-            await next_btn.click()
-            await self.page.wait_for_load_state('networkidle')
-            self.last_activity = datetime.datetime.now()
-            return {"status": "success", "new_url": self.page.url}
-        except Exception as e:
-            logging.error(f"Next page failed: {str(e)}")
-            return {"status": "error", "message": str(e)}
 
-async def aio_main(config: BrowserConfig = BrowserConfig(init_url="https://www.google.com")):
+# ------------------- API Service -------------------
+async def aio_main(config: BrowserConfig = BrowserConfig()):
     """API服务主循环"""
-    manager = await BrowserManager.get_instance(config)
+    core = await BrowserCore.get_instance(config)
+    search_handler = GoogleSearchHandler(core)
     
     try:
-        logging.info(f"API服务已启动 | 初始页面: {manager.page.url}")
-        while manager.status == 'running':
-            # 保持心跳并检查会话状态
-            if (datetime.datetime.now() - manager.last_activity).total_seconds() > 300:
-                logging.info("检测到会话超时,重新初始化浏览器...")
-                await manager.close()
-                await manager.initialize()
-                
+        await search_handler.search('python')
+        logging.info(f"API服务已启动 | 初始页面: {search_handler.page.url}")
+        while core.status == 'running':
             await asyncio.sleep(5)
             
     except KeyboardInterrupt:
@@ -179,17 +141,15 @@ async def aio_main(config: BrowserConfig = BrowserConfig(init_url="https://www.g
     except Exception as e:
         logging.error(f"API服务异常: {str(e)}")
     finally:
-        await manager.close()
+        await core.close()
         logging.info("API服务已停止")
 
 def main():
-    # 初始化并保持浏览器会话
     asyncio.run(aio_main())
 
 if __name__ == "__main__":
-    # 启动时初始化配置
     config = BrowserConfig(
         headless=True,
         init_url="https://www.google.com"
     )
-    main()
+    main()