Bläddra i källkod

使用单例模式便于外部调用,不过会话逻辑是错误的,它会到时间莫明重启,我只需要在程序运行期间浏览器也保持运行即可。如果外部API调用发现浏览器被关闭再次重启就好,而且状态似乎也不需要额外定义,playwright 应该有相关的方法检测浏览器或页面是否存活吧

mrh 1 år sedan
förälder
incheckning
21bfc2c2a4
1 ändrade filer med 131 tillägg och 39 borttagningar
  1. 131 39
      worker/search_engine/camoufox_broswer.py

+ 131 - 39
worker/search_engine/camoufox_broswer.py

@@ -52,52 +52,144 @@ class PageOperations:
         """执行键盘回车操作"""
         await self.page.keyboard.press("Enter")
 
-class CamoufoxManager:
-    """管理浏览器实例的生命周期"""
+class BrowserManager:
+    """全局浏览器会话管理器(单例模式)"""
+    _instance = None
+    _lock = asyncio.Lock()
+    
     def __init__(self, config: BrowserConfig = BrowserConfig()):
-        self.config = config
-        self.browser:Browser = None
-        self.page:Page = None
-        self.page_ops: PageOperations = None
-
-    async def __aenter__(self):
-        """异步上下文管理器入口"""
-        self.browser = await AsyncCamoufox(
-            headless=self.config.headless,
-            geoip=self.config.geoip,
-            proxy=self.config.proxy
-        ).__aenter__()
-        # 自动创建新页面并导航到初始URL
-        self.page = await self.browser.new_page()
-        await self.page.goto(self.config.init_url)
-        self.page_ops = PageOperations(self.page, self.config)
-        logging.info(f"Browser initialized with page: {self.page.url}")
-        return self
-
-    async def __aexit__(self, exc_type, exc, tb):
-        """异步上下文管理器退出"""
-        await self.browser.__aexit__(exc_type, exc, tb)
+        if not hasattr(self, '_initialized'):
+            self.config = config
+            self.browser: Browser = None
+            self.page: Page = None
+            self.page_ops: PageOperations = None
+            self.status: str = 'stopped'  # running/stopped/error
+            self.last_activity: datetime.datetime = None
+            self._initialized = True
 
-async def aio_main(config: BrowserConfig = BrowserConfig(init_url="https://www.google.com")):
-    """新版主运行函数(测试自动页面创建)"""
-    async with CamoufoxManager(config) as manager:
-        logging.info(f"当前页面URL: {manager.page.url}")
-        
-        # 保持页面存活直到手动关闭
+    @classmethod
+    async def get_instance(cls, config: BrowserConfig = BrowserConfig()) -> "BrowserManager":
+        """获取单例实例"""
+        if not cls._instance:
+            async with cls._lock:
+                if not cls._instance:
+                    cls._instance = cls(config)
+                    await cls._instance.initialize()
+        return cls._instance
+
+    async def initialize(self):
+        """初始化浏览器实例"""
+        try:
+            self.browser = await AsyncCamoufox(
+                headless=self.config.headless,
+                geoip=self.config.geoip,
+                proxy=self.config.proxy
+            ).__aenter__()
+            self.page = await self.browser.new_page()
+            await self.page.goto(self.config.init_url)
+            self.page_ops = PageOperations(self.page, self.config)
+            self.status = 'running'
+            self.last_activity = datetime.datetime.now()
+            logging.info(f"Browser session initialized | URL: {self.page.url}")
+        except Exception as e:
+            self.status = 'error'
+            logging.error(f"Browser initialization failed: {str(e)}")
+            raise
+
+    async def close(self):
+        """关闭浏览器实例"""
+        if self.browser:
+            await self.browser.__aexit__(None, None, None)
+            self.status = 'stopped'
+            logging.info("Browser session closed")
+
+    async def ensure_active_session(self):
+        """确保会话有效性"""
+        if self.status != 'running' or self.page.is_closed():
+            await self.close()
+            await self.initialize()
+
+    # API操作方法
+    async def search(self, query: str, engine: str = "google"):
+        """执行搜索操作"""
+        try:
+            await self.ensure_active_session()
+            
+            # 导航到搜索引擎
+            search_url = f"https://www.{engine}.com/search?q={query}"
+            await self.page.goto(search_url)
+            
+            # 等待搜索结果加载
+            await self.page.wait_for_selector('#search', timeout=5000)
+            self.last_activity = datetime.datetime.now()
+            
+            # 记录截图
+            screenshot_path = await self.page_ops.take_screenshot(f"search_{query}.png")
+            return {"status": "success", "screenshot": screenshot_path}
+            
+        except Exception as e:
+            self.status = 'error'
+            logging.error(f"Search failed: {str(e)}")
+            return {"status": "error", "message": str(e)}
+
+    async def get_page_info(self):
+        """获取当前页面信息"""
         try:
-            while not manager.page.is_closed():
-                await asyncio.sleep(1)
-        except KeyboardInterrupt:
-            logging.info("接收到终止信号,关闭浏览器...")
-        finally:
-            await manager.page.close()
-            logging.info("测试完成")
+            await self.ensure_active_session()
+            return {
+                "url": self.page.url,
+                "title": await self.page.title(),
+                "content": await self.page.content(),
+                "timestamp": datetime.datetime.now().isoformat()
+            }
+        except Exception as e:
+            logging.error(f"Get page info failed: {str(e)}")
+            return {"status": "error", "message": str(e)}
 
+    async def next_page(self):
+        """跳转到下一页"""
+        try:
+            await self.ensure_active_session()
+            next_btn = await self.page_ops.search_element('a:has-text("Next")')
+            await next_btn.click()
+            await self.page.wait_for_load_state('networkidle')
+            self.last_activity = datetime.datetime.now()
+            return {"status": "success", "new_url": self.page.url}
+        except Exception as e:
+            logging.error(f"Next page failed: {str(e)}")
+            return {"status": "error", "message": str(e)}
+
+async def aio_main(config: BrowserConfig = BrowserConfig(init_url="https://www.google.com")):
+    """API服务主循环"""
+    manager = await BrowserManager.get_instance(config)
+    
+    try:
+        logging.info(f"API服务已启动 | 初始页面: {manager.page.url}")
+        while manager.status == 'running':
+            # 保持心跳并检查会话状态
+            if (datetime.datetime.now() - manager.last_activity).total_seconds() > 300:
+                logging.info("检测到会话超时,重新初始化浏览器...")
+                await manager.close()
+                await manager.initialize()
+                
+            await asyncio.sleep(5)
+            
+    except KeyboardInterrupt:
+        logging.info("接收到终止信号,关闭浏览器...")
+    except Exception as e:
+        logging.error(f"API服务异常: {str(e)}")
+    finally:
+        await manager.close()
+        logging.info("API服务已停止")
 
-async def task():
-    pass
 def main():
+    # 初始化并保持浏览器会话
     asyncio.run(aio_main())
 
 if __name__ == "__main__":
+    # 启动时初始化配置
+    config = BrowserConfig(
+        headless=True,
+        init_url="https://www.google.com"
+    )
     main()