mrh 9 сар өмнө
parent
commit
bac07d3c6e

+ 7 - 4
ui/backend/config.yaml

@@ -17,24 +17,27 @@ sub:
       startup: true
     9662:
       file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9662.yaml
-      name: "\u81EA\u52A8\u9009\u62E9"
+      name: "\U0001F1F8\U0001F1EC\u4E9A\u9A6C\u900A\u65B0\u52A0\u57612"
       port: 9662
       startup: true
     9664:
       file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9664.yaml
-      name: "\u81EA\u52A8\u9009\u62E9"
+      name: "\U0001F1F3\U0001F1F1\u8377\u5170Eygelshoven | BT\u4E0B\u8F7D-0.1\u500D\
+        \u7387"
       port: 9664
       startup: true
     9666:
       file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9666.yaml
-      name: "\u81EA\u52A8\u9009\u62E9"
+      name: "\U0001F1EB\U0001F1F7\u6CD5\u56FD\u9A6C\u8D5B"
       port: 9666
       startup: true
     9668:
       file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9668.yaml
-      name: "\u81EA\u52A8\u9009\u62E9"
+      name: "\U0001F1F3\U0001F1F1\u8377\u5170\u963F\u59C6\u65AF\u7279\u4E39"
       port: 9668
       startup: true
+  redis_url: redis://localhost:6379/8
   start_port: 9660
   temp_dir: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp
   url: https://www.yfjc.xyz/api/v1/client/subscribe?token=b74f2207492053926f7511a8e474048f
+worker_backend_py: G:\code\upwork\zhang_crawl_bio\worker\api\worker_server.py

+ 2 - 5
ui/backend/main.py

@@ -4,7 +4,7 @@ import sys
 # 为了避免耦合,微服务,可能确实要将上级的上级目录作为一个单独的进程来处理,此目录作为一个单独的UI项目
 sys.path.append(str(Path(__file__).parent))
 from fastapi import FastAPI
-from routers.proxy import router, health_check_proxy_task,sub_mgr
+from routers.proxy import router,sub_mgr
 from routers.file import router as file_router
 from routers.worker import router as worker_router
 from fastapi.middleware.cors import CORSMiddleware
@@ -20,17 +20,14 @@ async def startup():
     logger.info(f"startup")
     tasks.append(asyncio.create_task(CeleryWorker().run()))
     tasks.append(asyncio.create_task(sub_mgr.startup()))
-    # asyncio.create_task(health_check_proxy_task(interval=90))
-    health_check_task_instance = asyncio.create_task(health_check_proxy_task(interval=90))
-    tasks.append(health_check_task_instance)
 @asynccontextmanager
 async def lifespan(app: FastAPI):
     """应用生命周期管理"""
     tasks = await startup()
     yield
+    await process_manager.cleanup()
     for task in tasks:
         task.cancel()
-    await process_manager.cleanup()
 
 # 创建 FastAPI 应用实例
 app = FastAPI(

+ 4 - 18
ui/backend/routers/proxy.py

@@ -44,8 +44,7 @@ class ProxyResponse(BaseModel):
     process_info: Optional[Dict] = None
     ping: Optional[Dict[str, int]] = None
 
-async def get_proxy_response(port: int, use_cache: bool = True):
-    cache_key = f"get_proxy_response_{port}"
+async def get_proxy_response(port: int):
     porxy_model = sub_mgr.sub.proxies.get(port)
     proxy_mgr = sub_mgr.list_proxies_mgr.get(port)
     mgr_url = proxy_mgr.get_management_url()
@@ -77,7 +76,7 @@ async def get_proxy_response(port: int, use_cache: bool = True):
     )
     return result
 
-async def get_all_proxy_response(use_cache: bool = True):
+async def get_all_proxy_response(use_cache: bool = True) -> List[ProxyResponse]:
     global sub_mgr
     ret = []
     tasks = []
@@ -85,21 +84,7 @@ async def get_all_proxy_response(use_cache: bool = True):
         tasks.append(get_proxy_response(port, use_cache))
     ret = await asyncio.gather(*tasks)
     return ret
-async def health_check_proxy_task(interval: int = 80):
-    """定时检查所有代理的健康状态"""
-    while True:
-        try:
-            logger.info("Running health check...")
-            proxies = await get_all_proxy_response(use_cache=True)
-            # if not proxies:
-            #     logger.error("Health check failed: No proxies available")
-            # else:
-            #     logger.info(f"Health check succeeded: {len(proxies)} proxies are healthy")
-        except Exception as e:
-            logger.error(f"Health check failed: {e}")
-        
-        # 等待指定的时间间隔
-        await asyncio.sleep(interval)
+
 @router.get("/ping")
 async def ping_proxies() -> Dict[str, int]:
     global sub_mgr,cache
@@ -137,6 +122,7 @@ class ProxyPostResponse(BaseModel):
     err: int = 1
     msg: str = ''
     data: ProxyResponse
+
 async def auto_select_proxy(port: int):
     global sub_mgr
     ping_res = await ping_proxies()

+ 31 - 3
ui/backend/src/services/subscription_manager.py

@@ -3,12 +3,14 @@ from pathlib import Path
 from typing import Dict, Optional
 import httpx
 import yaml
+import asyncio
 from datetime import datetime
 from src.services.proxy_manager import ProxyManager
 from utils.config import PROXY_POLL_DIR,config,Sub,Proxy,Config
 from utils.mihomo import get_sub,update_config,save_yaml_dump,find_free_port
 from utils.logu import get_logger,logger
 
+
 class SubscriptionManager:
     """管理订阅的生命周期,包括下载、更新、启动、停止等操作"""
     
@@ -16,6 +18,7 @@ class SubscriptionManager:
         self.sub = config.sub
         self.config = config
         self.list_proxies_mgr: Dict[int, ProxyManager] = {}
+        # 初始化Redis连接
         for proxy in self.sub.proxies.values():
             self.list_proxies_mgr[proxy.port] = ProxyManager(self.config.mimo_exe, proxy.file_path)
     def save_config(self):
@@ -136,8 +139,33 @@ class SubscriptionManager:
     async def startup(self):
         if not self.sub.auto_start:
             return
+        
+        # 并发启动所有代理
+        startup_tasks = []
+        for proxy in self.sub.proxies.values():
+            if proxy.startup:
+                startup_tasks.append(self.start_proxy(proxy.port))
+        
+        # 使用gather并发执行所有启动任务
+        await asyncio.gather(*startup_tasks)
+        
+        # 统一等待服务就绪(原每个代理等待3秒,现合并为单次等待)
+        await asyncio.sleep(3)  
+        
+        # 并发执行代理选择
+        select_tasks = []
         for proxy in self.sub.proxies.values():
             if proxy.startup:
-                logger.info(f"{proxy.port} 自动启动")
-                self.select_proxy(proxy.port, proxy.name)
-                await self.start_proxy(proxy.port)
+                # 使用包装函数处理异常
+                async def select_wrapper(proxy):
+                    try:
+                        await self.select_proxy(proxy.port, proxy.name)
+                    except Exception as e:
+                        logger.error(f"{proxy.port} 选择代理 {proxy.name} 失败: {str(e)}")
+                
+                select_tasks.append(select_wrapper(proxy))
+        
+        # 并发执行所有选择任务并允许异常捕获
+        await asyncio.gather(*select_tasks, return_exceptions=True)
+
+    

+ 11 - 0
ui/backend/tests/mytests/redis_t.py

@@ -0,0 +1,11 @@
+import redis
+
+# 连接到 Redis 服务器,选择数据库 8
+r = redis.Redis(host='localhost', port=6379, db=8)
+
+# 使用 SMEMBERS 命令获取集合中的所有成员
+members = r.smembers('proxy_pool')
+
+# 打印集合中的所有成员
+for member in members:
+    print(member.decode('utf-8'))  # 将字节字符串解码为普通字符串

+ 2 - 8
ui/backend/tests/mytests/t_sub_mgr.py

@@ -10,15 +10,9 @@ from utils.logu import get_logger
 logger = get_logger('mytests', file=True)
 
 async def main():
-    # print(config)
-
-    # config.browser = Browser()
-    # config.save(    )
-    logger.info(f"{config.browser}")
-    
-    logger.info(f"config: {Path(config.browser.exe_path).exists()}")
-    return
     sub_manager = SubscriptionManager(config)
+    logger.info(f"{await sub_manager.get_proxy_pool()}")
+    return
     # await sub_manager.download_subscription()
     # await sub_manager.create_custom_config(port=9660)
     # await sub_manager.create_custom_config(port=9662)

+ 1 - 0
ui/backend/utils/config.py

@@ -21,6 +21,7 @@ class Proxy(BaseModel):
 class Sub(AutoLoadModel):
     url: Optional[str] = None
     start_port: Optional[int] = 9660  # Changed to int
+    redis_url: Optional[str] = 'redis://localhost:6379/8'
     file: Optional[str] = None
     temp_dir: Optional[str] = str(PROXY_POLL_DIR / "temp")  
     auto_start: Optional[bool] = True 

+ 0 - 0
ui/docs/gpt/architecture.md


+ 22 - 0
ui/docs/gpt/proxy_pool.md

@@ -1,3 +1,25 @@
+# Redis ProxyPool
+有一个用户页面,有独立的 vue 前端和独立的 Fastapi 后端,这个用户页面可以启动停止和管理本机代理池 ["127.0.0.1:9360", ...] 。
+用户页面采用 vue3(语法糖) + element-plus + fastapi + pywebview 架构做的一个本地桌面应用。虽然前后端都是在同一台机器上运行,但是考虑到未来可能或做成网页服务,因此代码仍遵循前后端分离。
+我们姑且把前后端应用程序当成管理程序,通过 Fastapi / 路径来挂载前端编译好的静态文件,通过 /api 路径来处理前端的交互,
+例如: 
+- 用户页面管理程序 Server: localhost:5835
+- / index.html /assets/* 前端静态文件
+- /api/proxy/proxies 查询和管理代理池
+- /api/worker/ 管理和查询 worker 状态。  /api/worker/browser_config 修改浏览器路径,用于 worker 的浏览器配置
+
+
+在另一个独立的项目中,有一个 celery  几个 worker 服务分别运行在不同进程中,在用户页面点击相关按钮,或者跟随用户页面,自启动这些 worker 、celery 、Redis ,具体启动逻辑在用户页面 ui\backend\src\services\celery_worker.py 文件中可以一键启动这些 worker celery。
+
+未来 worker 可能还要扩展到别的电脑,不过现在暂时只有一台电脑。为了控制 worker 的任务启停,对外暴露 API 接口,该接口是 worker\api\search_cli.py 来监听各方的请求,包括用户页面也可以请求,请求接口中像 celery_app.send_task(...) 这样来调用各个 worker。
+
+
+由于 worker 可能也用到代理池,也可能不用,仅默认使用系统代理池。而代理池又是由用户页面来管理,因此我打算采用 Redis 来作为中央代理池。
+
+虽然我不对外使用,只在私有网络使用。
+
+在用户页面中,哪个地方定期更新代理池比较好?
+
 # proxy manager
 为了单一职责,帮我在 ui\backend\src 目录中新建一个类来管理订阅。
 一个订阅来自一个 url 链接,该链接会得到一个 yaml 文件。大致内容如下:

+ 2 - 0
utils/proxy_pool.py

@@ -3,5 +3,7 @@ from config.settings import PROXIES
 
 
 def get_random_proxy() -> str:
+    if not PROXIES:
+        return None
     """Get random proxy from proxy pool"""
     return random.choice(PROXIES)

+ 143 - 20
worker/api/search_cli.py

@@ -1,42 +1,161 @@
 import asyncio
-from fastapi import APIRouter, FastAPI, HTTPException
+from fastapi import APIRouter, FastAPI, HTTPException, status
 from pydantic import BaseModel
-from typing import Dict, Optional
+from typing import Dict, List, Optional, Any
 import DrissionPage
 from worker.search_engine.drission_google_search import search_keyword_drission
 from DrissionPage import ChromiumPage
 from mylib.drission_page import load_chrome_from_ini
 from mylib.logu import logger
+from worker.celery.app import app as celery_app
+from worker.celery.crawl_client import submit_page_crawl_tasks
+
 
 app = APIRouter()
 
+class TaskResponse(BaseModel):
+    task_id: str
+    status: str
+    message: str
+
 class SearchRequest(BaseModel):
-    keyword: str
-    max_result_items: int = 200
-    skip_existing: bool = True
-    browser_config: Dict = {}
+    keywords: List[str]
+    max_result_items: Optional[int] = 200
+    skip_existing: Optional[bool] = True
+    browser_config: Optional[Dict] = {}
+    proxy_pool: Optional[List[str]] = None
+
+
+class CrawlKeywordsRequest(BaseModel):
+    keywords: List[str]
+    browser_config: Optional[Dict] = {}
+    proxy_pool: Optional[List[str]] = None
+
+
+class ConvertRequest(BaseModel):
+    result_ids: List[int]
 
 class BrowserTestRequest(BaseModel):
     browser_config: Dict = {}
     init_url: str = "https://www.google.com"
 
 @app.post("/search", summary="执行Google搜索")
-async def search(request: SearchRequest):
+def search(request: SearchRequest) -> List[TaskResponse]:
+    """提交所有关键词任务"""
+    responses = []
+    for keyword in request.keywords:
+        try:
+            result = celery_app.send_task('search_worker.drission_search', kwargs=request.model_dump())
+            logger.info(f"任务已提交: {keyword} (任务ID: {result.id})")
+            responses.append(TaskResponse(
+                task_id=result.id,
+                status="submitted",
+                message=f"Search task submitted for keyword: {keyword}"
+            ))
+        except Exception as e:
+            logger.error(f"提交任务失败 [{keyword}]: {str(e)}")
+            responses.append(TaskResponse(
+                task_id="",
+                status="failed",
+                message=str(e)
+            ))
+    return responses
+
+class CrawlItemRequest(BaseModel):
+    page_ids: List[int]
+    browser_config: Dict = {}
+
+@app.post("/crawl_item", summary="爬取单个搜索结果项")
+def crawl_item(request: CrawlItemRequest) -> List[TaskResponse]:
+    """提交页面爬取任务"""
+    responses = []
+    for page_id in request.page_ids:
+        try:
+            task_data = {
+                'page_id': page_id,
+                'browser_config': request.browser_config,
+                'overwrite': False
+            }
+            result = celery_app.send_task('crawl_worker.crawl_page_urls', kwargs=task_data)
+            responses.append(TaskResponse(
+                task_id=result.id,
+                status="submitted",
+                message=f"Crawl task submitted for page ID: {page_id}"
+            ))
+        except Exception as e:
+            logger.error(f"提交页面爬取任务失败 [页面ID {page_id}]: {str(e)}")
+            responses.append(TaskResponse(
+                task_id="",
+                status="failed",
+                message=str(e)
+            ))
+    return responses
+
+@app.post("/crawl_keywords", response_model=List[TaskResponse], summary="提交关键词URL爬取任务")
+def crawl_keywords(request: CrawlKeywordsRequest) -> List[TaskResponse]:
+    """提交关键词URL爬取任务(对应worker.celery.crawl_client.submit_crawl_tasks)"""
+    responses = []
+    for keyword in request.keywords:
+        try:
+            task_data = {
+                'keyword': keyword.strip(),
+                'browser_config': {
+                    **request.browser_config,
+                    'proxy': request.browser_config.get('proxy'),
+                    'headless': True,
+                    'verbose': False,
+                    'extra_args': ["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"]
+                },
+                'overwrite': False
+            }
+            result = celery_app.send_task('crawl_worker.crawl_keyword_urls', kwargs=task_data)
+            responses.append(TaskResponse(
+                task_id=result.id,
+                status="submitted",
+                message=f"Keyword crawl task submitted: {keyword}"
+            ))
+        except Exception as e:
+            logger.error(f"提交关键词URL爬取任务失败 [{keyword}]: {str(e)}")
+            responses.append(TaskResponse(
+                task_id="",
+                status="failed",
+                message=str(e)
+            ))
+    return responses
+
+@app.post("/convert_single", response_model=TaskResponse, summary="转换单个HTML结果")
+def convert_single(request: ConvertRequest) -> TaskResponse:
+    """提交单个HTML转换任务"""
     try:
-        logger.info(f"Starting search with parameters: {request.model_dump()}")
-        # 使用 to_thread 包装同步操作
-        result = await asyncio.to_thread(
-            search_keyword_drission,
-            keyword=request.keyword,
-            max_result_items=request.max_result_items,
-            skip_existing=request.skip_existing,
-            browser_config=request.browser_config
+        result = celery_app.send_task('html_convert_tasks_worker.convert_single_result', args=[request.result_ids[0]])
+        return TaskResponse(
+            task_id=result.id,
+            status="submitted",
+            message=f"Conversion task submitted for result ID: {request.result_ids[0]}"
         )
-        logger.info(f"Search completed with result: {result}")
-        return result
     except Exception as e:
-        logger.error(f"Error during search execution: {str(e)}")
-        raise HTTPException(status_code=500, detail=str(e))
+        logger.error(f"提交HTML转换任务失败 [结果ID {request.result_ids[0]}]: {str(e)}")
+        raise HTTPException(
+            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+            detail=str(e)
+        )
+
+@app.post("/convert_all", response_model=TaskResponse, summary="转换所有HTML结果")
+def convert_all() -> TaskResponse:
+    """提交批量HTML转换任务"""
+    try:
+        result = celery_app.send_task('html_convert_tasks.convert_all_results')
+        return TaskResponse(
+            task_id=result.id,
+            status="submitted",
+            message="Bulk conversion task submitted for all results"
+        )
+    except Exception as e:
+        logger.error(f"提交批量HTML转换任务失败: {str(e)}")
+        raise HTTPException(
+            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+            detail=str(e)
+        )
 
 @app.post("/browser/test", summary="测试浏览器启动")
 async def test_browser(request: BrowserTestRequest):
@@ -48,7 +167,11 @@ async def test_browser(request: BrowserTestRequest):
         logger.error(f"Error during browser test: {str(e)}")
         raise HTTPException(status_code=500, detail=str(e))
 async def test_browser_launch(browser_config: Dict, init_url: str) -> ChromiumPage:
-    page = load_chrome_from_ini(**browser_config)
+    # 如果配置中有代理,使用代理创建浏览器实例
+    page = load_chrome_from_ini(
+        **browser_config,
+        proxy=browser_config.get('proxy')  # 自动处理代理配置
+    )
     page.get(init_url)
     return page
 

+ 5 - 3
worker/celery/crawl_tasks.py

@@ -3,15 +3,16 @@ import sys
 from worker.celery.app import app
 from worker.crawl_pages.crawl_urls import URLCrawler
 from mylib.logu import get_logger
+from config.settings import PROXIES
 from crawl4ai import BrowserConfig
 logger = get_logger('crawl_worker')
 
 @app.task(name='crawl_worker.crawl_keyword_urls')
-def crawl_keyword_urls_task(keyword: str, browser_config: dict = None, overwrite: bool = False):
+def crawl_keyword_urls_task(keyword: str, browser_config: dict = None, overwrite: bool = False, proxy_pool:list[str]=None):
     """Celery task to crawl all URLs for a specific keyword"""
     if sys.platform == 'win32':
         asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
-    
+    PROXIES = proxy_pool if proxy_pool else None   
     async def _execute_crawl():
         try:
             # Convert dict config to BrowserConfig if provided
@@ -37,10 +38,11 @@ def crawl_keyword_urls_task(keyword: str, browser_config: dict = None, overwrite
         asyncio.set_event_loop_policy(None)
 
 @app.task(name='crawl_worker.crawl_page_urls')
-def crawl_page_urls_task(page_id: int, browser_config: dict = None, overwrite: bool = False):
+def crawl_page_urls_task(page_id: int, browser_config: dict = None, overwrite: bool = False, proxy_pool:list[str]=None):
     """Celery task to crawl all URLs for a specific search page"""
     if sys.platform == 'win32':
         asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
+    PROXIES = proxy_pool if proxy_pool else None   
     
     async def _execute_crawl():
         try:

+ 5 - 4
worker/celery/tasks.py

@@ -6,17 +6,18 @@ import sys
 import asyncio
 import httpx
 from utils.proxy_pool import get_random_proxy
-
+from config.settings import PROXIES
 
 @app.task(name='search_worker.drission_search')
-def drission_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True, browser_config: dict={}):
+def drission_search_task(keyword: str, max_result_items: int=200, skip_existing: bool=True, browser_config: dict={}, proxy_pool:list[str]=None):
     """异步关键词搜索任务"""
     if sys.platform == 'win32':
         asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
-    
+    PROXIES = proxy_pool if proxy_pool else None   
     async def _execute_search():
         try:
-            browser_config.update({'proxy': get_random_proxy()})
+            if proxy_pool:
+                browser_config.update({'proxy': get_random_proxy()})
             logger.info(f"browser_config: {browser_config}")
             logger.info(f"开始处理关键词搜索任务: {keyword}")
             result = search_keyword_drission(keyword, max_result_items=max_result_items, skip_existing=skip_existing, browser_config=browser_config)

+ 0 - 1
worker/crawl_pages/crawl_urls.py

@@ -63,7 +63,6 @@ class URLCrawler:
                 headless=True,
                 verbose=False,
                 extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
-                proxy=await self.get_random_proxy()
             )
         
         crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)