Просмотр исходного кода

完成批量启动 worker celery 和健康检查

mrh 1 год назад
Родитель
Сommit
5f55f0d360

+ 5 - 3
ui/backend/main.py

@@ -11,13 +11,14 @@ from fastapi.middleware.cors import CORSMiddleware
 from contextlib import asynccontextmanager
 from utils.process_mgr import process_manager
 from utils.config import config,WORKER_DIR_BASE
-
+from src.services.celery_worker import CeleryWorker
+from utils.logu import get_logger,logger
 async def startup():
     """应用启动时执行的操作"""
     global process_manager,config
     tasks = []
-    resdis_task = asyncio.create_task(process_manager.start_process("redis", [config.redis_exe], cwd=WORKER_DIR_BASE))
-    tasks.append(resdis_task)
+    logger.info(f"startup")
+    tasks.append(asyncio.create_task(CeleryWorker().run()))
     # asyncio.create_task(health_check_proxy_task(interval=90))
     health_check_task_instance = asyncio.create_task(health_check_proxy_task(interval=90))
     tasks.append(health_check_task_instance)
@@ -34,6 +35,7 @@ async def lifespan(app: FastAPI):
 app = FastAPI(
     description="",
     version="1.0.0",
+    lifespan=lifespan
 )
 app.add_middleware(
     CORSMiddleware,

+ 5 - 1
ui/backend/routers/worker.py

@@ -9,6 +9,7 @@ from typing import Optional
 import pathlib
 import os
 from utils.config import WORKER_SERVICE_URL,config,Browser
+from src.services.celery_worker import CeleryWorker
 from utils.logu import logger
 
 router = APIRouter()
@@ -31,6 +32,7 @@ class PathValidationResponse(BaseModel):
 class ResponseStatus(BaseModel):
     endpoint: Endpoint = Endpoint()
     browser_config: Optional[Browser] = None
+    celery_status: Optional[Dict] = {}
     
 @router.post("/browser_config", tags=["worker"])
 async def update_browser_config(new_config: Browser):
@@ -97,8 +99,10 @@ async def status():
     except Exception as e:
         logger.error(e)
         health = {"err": 1, "msg": str(e)}
+    celery_worker = CeleryWorker()
+    celery_status = await celery_worker.check_worker_status()
     """获取当前请求的端点"""
-    return ResponseStatus(endpoint=Endpoint(health=health), browser_config=config.browser)
+    return ResponseStatus(endpoint=Endpoint(health=health), browser_config=config.browser, celery_status=celery_status)
 
 
 async def health_check():

+ 66 - 0
ui/backend/src/services/celery_worker.py

@@ -0,0 +1,66 @@
+from pathlib import Path
+import sys
+from typing import Dict,Any
+
+import httpx
+import redis
+from src.services.subscription_manager import SubscriptionManager
+from utils.config import config,APP_PATH
+from utils.process_mgr import process_manager
+
+import asyncio
+from utils.logu import get_logger,logger
+import os
+
+
+class CeleryWorker:
+    async def run(self):
+        python_exe = sys.executable
+        WORKER_DIR_BASE = APP_PATH.parent.parent
+        logger.info(f"{WORKER_DIR_BASE}")
+        # return
+        redis_cmd = [config.redis_exe]
+        logger.info(f"{redis_cmd}")
+        flower_db = WORKER_DIR_BASE / 'output' / 'flower_db'
+        await process_manager.start_process("redis_cmd", redis_cmd, cwd=WORKER_DIR_BASE)
+        # G:\code\upwork\zhang_crawl_bio\crawl_env\python.exe -m celery -A worker.celery.app flower --address=127.0.0.1 --persistent=True --db=".\output\flower_db"
+        flower_cmd = [python_exe, '-m', 'celery', '-A', 'worker.celery.app', 'flower', '--address=127.0.0.1', '--persistent=True', f'--db={flower_db}']
+        await process_manager.start_process("flower", flower_cmd, cwd=WORKER_DIR_BASE)
+        proces = process_manager.processes.get("flower").get('process')
+
+        search_worker_name = 'search'
+        crawl_worker_name = 'crawl'
+        convert_worker_name = 'convert'
+        worker_list = [search_worker_name, crawl_worker_name, convert_worker_name]
+        for worker_name in worker_list:
+            await process_manager.start_process(f"{worker_name}_worker", [python_exe, '-m', 'celery', '-A', 'worker.celery.app', 'worker', '-Q',f'{worker_name}_queue', f'--hostname={worker_name}@%h'], cwd=WORKER_DIR_BASE)
+
+    async def check_worker_status(self) -> Dict[str, Any]:
+        flower_url = "http://127.0.0.1:5555/workers?json=1"
+        async with httpx.AsyncClient() as client:
+            try:
+                response = await client.get(flower_url)
+                response.raise_for_status()
+                workers_status = response.json()
+                # 检查 Redis 状态
+                redis_status = await self.check_redis_status()
+
+                # 合并 Worker 状态和 Redis 状态
+                return {"err": 0, "msg": "success", "workers": workers_status, "redis": redis_status}
+            except httpx.HTTPStatusError as e:
+                return {"err": 1, "msg": f"HTTP error occurred: {e}"}
+            except Exception as e:
+                return {"err": 1, "msg": f"An error occurred: {e}"}
+
+    async def check_redis_status(self) -> Dict[str, Any]:
+        # 检查 Redis 服务是否可访问
+        try:
+            redis_client = redis.Redis(host='127.0.0.1', port=6379, db=1)
+            ping_response = redis_client.ping()
+            if ping_response:
+                return {"err": 0, "msg": "Redis is running and accessible"}
+            else:
+                return {"err": 1, "msg": "Redis is running but not responding correctly"}
+        except Exception as e:
+            return {"err": 1, "msg": f"Failed to connect to Redis: {e}"}
+        

+ 19 - 0
ui/backend/tests/mytests/celery_worker_t.py

@@ -0,0 +1,19 @@
+import asyncio
+import json
+from pathlib import Path
+import sys
+import sys
+# 为了避免耦合,微服务,可能确实要将上级的上级目录作为一个单独的进程来处理,此目录作为一个单独的UI项目
+sys.path.append(str(Path(r'G:\code\upwork\zhang_crawl_bio\ui\backend')))
+from src.services.celery_worker import CeleryWorker
+from utils.logu import get_logger
+logger = get_logger('mytests', file=True)
+
+async def main():
+    worker = CeleryWorker()
+    status = await worker.check_worker_status()
+    json_dump = json.dumps(status, indent=4)
+    logger.info(json_dump)
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 4 - 1
ui/docs/gpt/work_flow.md

@@ -3,7 +3,10 @@
 我现在有一个浏览器自动化任务项目,用 DrisstionPage 进行驱动浏览器。现在已经完成了所有代码,不过各个代码函数方法都是单一职责的,并没有复杂的调度和记录重试、监控、中断继续等等功能。因为自动化场景中比较复杂,稍有错误便无法继续进行:
 有访问主页(是否成功访问、是否弹出验证码)、输入关键词(是否能找到输入框)、回车搜索(是否有验证码)、将搜索结果解析并保存到本地(是否能解析成功标题、内容、网址等信息,是否存在下一页、如不存在,任务可标记为完成)、点击下一页、解析并保存到本地(是否能解析成功标题、内容、网址等信息,是否还有下一页)。。。
 
-我看到任何一个步骤出错都可能导致失败,并且程序的进度保存、避免出错后重头再来、监控进度、是否重试、失败的原因、在哪一步导致失败,需要的程序逻辑非常繁杂,我认为手写全部代码不是好选择。我看了 airflow 、prefect 等框架,发现他们可以记录工作流,但是不确定他们是否是 RPA 自动化爬虫任务的最佳选择框架?你能否给我推荐哪些最佳开源工具框架,给我最佳方案
+我看到任何一个步骤出错都可能导致失败,并且程序的进度保存、避免出错后重头再来、监控进度、是否重试、失败的原因、在哪一步导致失败,需要的程序逻辑非常繁杂,我认为手写全部代码不是好选择。我看了 airflow 、prefect V3 、luigi 等框架,发现他们可以记录工作流,但是不确定他们是否是 RPA 自动化爬虫任务的最佳选择框架?
+我试了 prefect v3 版本,发现它无法在控制台查看和记录 task 输入输出,只能手动从代码中调用 task result ,似乎不够好用。
+我试了 aireflow 发现它仅支持 Linux 系统, 无法很好支持 Windows 直接运行。
+你能否给我推荐哪些最佳开源工具框架,给我最佳方案
 
 ---