Bläddra i källkod

完成自定义 celery 后端地址启动

mrh 9 månader sedan
förälder
incheckning
9b4ebbd70b

+ 7 - 5
ui/backend/config.yaml

@@ -20,27 +20,29 @@ sub:
       startup: true
     9662:
       file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9662.yaml
-      name: "\U0001F1F8\U0001F1EC\u4E9A\u9A6C\u900A\u65B0\u52A0\u57612"
+      name: "\u81EA\u52A8\u9009\u62E9"
       port: 9662
       startup: true
     9664:
       file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9664.yaml
-      name: "\U0001F1F3\U0001F1F1\u8377\u5170Eygelshoven | BT\u4E0B\u8F7D-0.1\u500D\
-        \u7387"
+      name: "\u81EA\u52A8\u9009\u62E9"
       port: 9664
       startup: true
     9666:
       file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9666.yaml
-      name: "\U0001F1EB\U0001F1F7\u6CD5\u56FD\u9A6C\u8D5B"
+      name: "\u81EA\u52A8\u9009\u62E9"
       port: 9666
       startup: true
     9668:
       file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9668.yaml
-      name: "\U0001F1F3\U0001F1F1\u8377\u5170\u963F\u59C6\u65AF\u7279\u4E39"
+      name: "\u81EA\u52A8\u9009\u62E9"
       port: 9668
       startup: true
   redis_url: redis://localhost:6379/8
   start_port: 9660
   temp_dir: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp
   url: https://www.yfjc.xyz/api/v1/client/subscribe?token=b74f2207492053926f7511a8e474048f
+worker_backend:
+  host: localhost
+  port: 5836
 worker_backend_py: G:\code\upwork\zhang_crawl_bio\worker\api\worker_server.py

+ 16 - 15
ui/backend/src/services/celery_worker.py

@@ -1,23 +1,19 @@
 from pathlib import Path
 import subprocess
 import sys
-from typing import Dict,Any, List, Optional
-
+from typing import Dict, Any, List, Optional
 import httpx
 from pydantic import BaseModel,field_validator,Field
 import redis
-from src.services.subscription_manager import SubscriptionManager
-from utils.config import config,APP_PATH
+from utils.config import config, APP_PATH
 from utils.process_mgr import process_manager
+from utils.logu import logger
 
-import asyncio
-from utils.logu import get_logger,logger
-import os
 WORKER_DIR_BASE = APP_PATH.parent.parent
 py_client: Optional[Dict[str,Any]] = {
-    'search': WORKER_DIR_BASE / 'worker\celery\client.py' ,
-    'crawl': WORKER_DIR_BASE / 'worker\celery\crawl_client.py',
-    'convert': WORKER_DIR_BASE / 'worker\celery\html_convert_tasks.py'
+    'search': WORKER_DIR_BASE / r'worker\celery\client.py' ,
+    'crawl': WORKER_DIR_BASE / r'worker\celery\crawl_client.py',
+    'convert': WORKER_DIR_BASE / r'worker\celery\html_convert_tasks.py'
 }
 
 
@@ -39,22 +35,27 @@ class WorkerModel(BaseModel):
 
 class CeleryWorker:
     def __init__(self, python_exe: str=sys.executable):
+        self.python_exe = python_exe
         self.workers_model: Dict[str, WorkerModel] = {}
         for worker_name in py_client.keys():
             model = WorkerModel(name=worker_name)
             model.cmd = [python_exe, '-m', 'celery', '-A', 'worker.celery.app', 'worker', '-Q',model.queue_name, f'--hostname={worker_name}@%h']
             self.workers_model[worker_name] = model
     async def run(self):
-        python_exe = sys.executable
-        logger.info(f"{WORKER_DIR_BASE}")
+        python_exe = self.python_exe
+        logger.info(f"WORKER_DIR_BASE {WORKER_DIR_BASE}")
         # return
         redis_cmd = [config.redis_exe]
-        logger.info(f"{redis_cmd}")
         flower_db = WORKER_DIR_BASE / 'output' / 'flower_db'
         await process_manager.start_process("redis_cmd", redis_cmd, cwd=WORKER_DIR_BASE)
 
-        worker_backend_py = config.worker_backend_py
-        worker_backend_cmd = [python_exe, worker_backend_py]
+        worker_backend_cmd = [
+            self.python_exe,
+            config.worker_backend_py,
+            "--host", config.worker_backend.host,
+            "--port", str(config.worker_backend.port)
+        ]
+
         await process_manager.start_process("worker_backend", worker_backend_cmd, cwd=WORKER_DIR_BASE)
 
         # G:\code\upwork\zhang_crawl_bio\crawl_env\python.exe -m celery -A worker.celery.app flower --address=127.0.0.1 --persistent=True --db=".\output\flower_db"

+ 15 - 19
ui/backend/utils/config.py

@@ -1,6 +1,6 @@
 import yaml
 from pathlib import Path
-from pydantic import BaseModel
+from pydantic import BaseModel, Field
 from typing import List, Dict, Union,Optional,Any
 from utils.pydantic_auto_field import AutoLoadModel
 APP_PATH = Path(__file__).parent.parent
@@ -17,20 +17,19 @@ class Proxy(BaseModel):
     file_path: Optional[str] = ''
     startup: Optional[bool] = False
 
-
 class Sub(AutoLoadModel):
     url: Optional[str] = None
-    start_port: Optional[int] = 9660  # Changed to int
+    start_port: Optional[int] = 9660
     redis_url: Optional[str] = 'redis://localhost:6379/8'
     file: Optional[str] = None
     temp_dir: Optional[str] = str(PROXY_POLL_DIR / "temp")  
     auto_start: Optional[bool] = True 
     proxies: Optional[Dict[Union[int,str], Proxy]] = {}
+    
     def __init__(self, **data):
-            super().__init__(**data)
-            # Convert proxies dictionary values to Proxy objects
-            if self.proxies:
-                self.proxies = {k: Proxy(**v) if isinstance(v, dict) else v for k, v in self.proxies.items()}
+        super().__init__(**data)
+        if self.proxies:
+            self.proxies = {k: Proxy(**v) if isinstance(v, dict) else v for k, v in self.proxies.items()}
 
 class Browser(BaseModel):
     exe_path: Optional[str] = str(REPO_BASE_DIR / r"download\chrome-win\chrome.exe")
@@ -49,11 +48,13 @@ class Config(BaseModel):
     sqluri: Optional[str] = r'G:\code\upwork\zhang_crawl_bio\output\temp.db'
     browser: Optional[Browser] = Browser()
     backend: Optional[Backend] = Backend()
-    redis_port: Optional[int] = None  # Changed to int
+    worker_backend: Optional[Backend] = Field(default_factory=lambda: Backend(host="localhost", port=8003))
+    redis_port: Optional[int] = None
+
     def save(self):
         config_path = get_config_path()
         with open(config_path, "w", encoding="utf-8") as file:
-            yaml.dump(self.model_dump(), file, encoding="utf-8", )
+            yaml.dump(self.model_dump(), file)
         return self
             
 def get_config_path():
@@ -66,19 +67,14 @@ def read_config(config_path: Path):
         return config
     with open(config_path, "r", encoding="utf-8") as file:
         config_dict = yaml.safe_load(file)
-    config = Config(**config_dict)
-    return config
-config = read_config(get_config_path())
+    return Config(**config_dict)
 
-import os
-
-WORKER_SERVICE_URL = os.getenv("WORKER_SERVICE_URL", "http://localhost:8003")
+config = read_config(get_config_path())
+WORKER_SERVICE_URL = f"http://{config.worker_backend.host}:{config.worker_backend.port}"
 
 def main():
     print(config)
-    config.browser = Browser()
-    config.save(
-        
-    )
+    config.save()
+
 if __name__ == "__main__":
     main()

+ 46 - 0
ui/docs/gpt/worker_api.md

@@ -1,3 +1,49 @@
+# 提交任务
+ui 目录是用户页面,有前后端代码。
+
+非 ui 目录是另一个独立的项目,我把它称为 celery 项目,跟 ui 目录的代码并不能相互调用。
+
+在 celery 项目中也有一个后端服务, worker\api\worker_server.py ,它可以由 ui 项目 ui\backend\src\services\celery_worker.py 启动该进程。并且发起请求。
+
+例如,在 ui 项目中,用户点击了按钮,请求 search 到 ui 后端 ui\backend\routers\worker.py 的 /ctrl 接口中,然后 ui 后端再发起请求到 celery 项目的 worker\api\worker.py 中的 /search 接口。
+celery 会在自己的数据库中读取数据库任务,并且提交到 redis 中。
+
+在 ui 后端请求 celery 后端过程中,我希望根据 select_proxy 是 system 还是 pool 来决定传输什么参数。如果是 system ,则不用传代理,因为 celery /search 中的 worker 会启动一个浏览器,浏览器默认就是系统代理。如果是 pool ,则需要传递代理池数据。
+代理池数据可以根据 ui 后端的 config.backend.host:config.backend.port 的 /api/proxy/proxies-pool 接口获取。例如:
+```
+Curl
+
+curl -X 'GET' \
+  'http://localhost:5835/api/proxy/proxies-pool?force_refresh=false' \
+  -H 'accept: application/json'
+Request URL
+http://localhost:5835/api/proxy/proxies-pool?force_refresh=false
+Server response
+Code	Details
+200	
+Response body
+Download
+{
+  "proxies": [
+    "127.0.0.1:9660",
+    "127.0.0.1:9662",
+    "127.0.0.1:9664",
+    "127.0.0.1:9666",
+    "127.0.0.1:9668"
+  ],
+  "cached": true
+}
+```
+
+# backend host
+ui 目录是用户页面,有前后端代码。
+
+非 ui 目录是另一个独立的项目,我把它称为 celery 项目,跟 ui 目录的代码并不能相互调用。
+
+在 celery 项目中也有一个后端服务, worker\api\worker_server.py ,它可以由 ui 项目 ui\backend\src\services\celery_worker.py 启动该进程。
+
+现在我不但想启动,还想自定义 celery 后端的主机和端口,由 ui 项目配置 Config.worker_backend 来决定。帮我修改完成代码。
+
 # worker api
 上述代码似乎有很多问题,你来帮我判断:
 - 项目采用 SQLmodel 驱动 SQLite 数据库,DB_URL = os.environ.get('DB_URL') or f"sqlite:///{OUTPUT_DIR}/temp.db"  ,在别的地方会用到,而且几乎都是同步操作的代码。只有对外开放API接口时, Fastapi 是使用异步接口。

+ 2 - 2
ui/fontend/src/components/WorkerCtrl.vue

@@ -84,7 +84,7 @@ const workerStatus = computed(() => {
     if (workerType in status) {
       status[workerType] = worker['status'] === true
     }
-    console.log('Worker状态:', workerType, status[workerType])
+    // console.log('Worker状态:', workerType, status[workerType])
   })
 
   return status
@@ -96,7 +96,7 @@ async function fetchWorkerStatus() {
     if (!response.ok) throw new Error('请求失败')
     const data = await response.json()
   workers.value = data.celery_status?.workers?.data || []
-  console.log('Worker 状态信息:', workers.value)
+  // console.log('Worker 状态信息:', workers.value)
   } catch (error) {
     ElMessage.error('获取Worker状态失败')
     console.error('Fetch worker status error:', error)

+ 0 - 1
worker/api/search_cli.py → worker/api/worker_router.py

@@ -3,7 +3,6 @@ from fastapi import APIRouter, FastAPI, HTTPException, status
 from pydantic import BaseModel
 from typing import Dict, List, Optional, Any
 import DrissionPage
-from worker.search_engine.drission_google_search import search_keyword_drission
 from DrissionPage import ChromiumPage
 from mylib.drission_page import load_chrome_from_ini
 from mylib.logu import logger

+ 28 - 11
worker/api/worker_server.py

@@ -1,32 +1,49 @@
 from fastapi import FastAPI
+import argparse
 from worker.api.excel_load import app as excel_load_app
-from worker.api.search_cli import app as search_cli_app
+from worker.api.worker_router import app as worker_app
 from fastapi.middleware.cors import CORSMiddleware
 from config.settings import DB_URL, GOOGLE_SEARCH_DIR
+
 app = FastAPI(
     title="搜索微服务",
     description="提供关键词导入和搜索功能的统一API接口",
     version="1.0.0"
 )
+
+# Add CORS middleware
 app.add_middleware(
     CORSMiddleware,
-    allow_origins=["*"],  # 允许所有域名访问
+    allow_origins=["*"],
     allow_credentials=True,
-    allow_methods=["*"],  # 允许所有方法(GET, POST, PUT, DELETE 等)
-    allow_headers=["*"],  # 允许所有头部
+    allow_methods=["*"],
+    allow_headers=["*"],
 )
 
-# 合并两个子应用
+# Include routers
 app.include_router(excel_load_app, prefix="/keywords")
-app.include_router(search_cli_app, prefix="/search")
+app.include_router(worker_app, prefix="/search")
 
 @app.get("/health")
 async def health_check():
-    """服务健康检查"""
-    return {"status": "healthy", 'db_url': DB_URL, 'google_search_dir':GOOGLE_SEARCH_DIR, 'msg': 'ok'}
+    return {
+        "status": "healthy",
+        "host": args.host,
+        "port": args.port,
+        "db_url": DB_URL,
+        "google_search_dir": GOOGLE_SEARCH_DIR
+    }
 
 if __name__ == "__main__":
     import uvicorn
-    # python .\worker\api\worker_server.py
-    # python -m uvicorn worker:api:worker_server:app --reload
-    uvicorn.run(app, host="127.0.0.1", port=8003)
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--host", default="127.0.0.1", help="Binding host")
+    parser.add_argument("--port", type=int, default=8003, help="Binding port")
+    args = parser.parse_args()
+    
+    uvicorn.run(
+        app,
+        host=args.host,
+        port=args.port,
+        log_config=None
+    )

+ 0 - 0
worker/celery/tasks.py → worker/celery/search_tasks.py