| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- import subprocess
- from fastapi import APIRouter, HTTPException, Request, Depends
- from fastapi.responses import JSONResponse, StreamingResponse
- import httpx
- import os
- from pathlib import Path
- from typing import Dict
- from pydantic import BaseModel
- from typing import Optional
- import pathlib
- import os
- from utils.config import WORKER_SERVICE_URL,config,Browser
- from src.services.celery_worker import celery_worker
- from utils.logu import logger
- router = APIRouter()
- class Endpoint(BaseModel):
- service_url: str = WORKER_SERVICE_URL
- token: Optional[str] = "temp_token_123"
- health: Optional[Dict] = {}
- class PathValidationRequest(BaseModel):
- file_path: str
- class PathValidationResponse(BaseModel):
- valid: bool
- message: str
- file_name: Optional[str] = None
- file_size: Optional[int] = None
- class ResponseStatus(BaseModel):
- endpoint: Endpoint = Endpoint()
- browser_config: Optional[Browser] = None
- celery_status: Optional[Dict] = {}
-
- def transform_workers_data(workers_data: list) -> list:
- """转换worker数据为前端需要的格式"""
- return [
- {
- "hostname": worker.get("hostname"),
- "worker-online": worker.get("worker-online"),
- "pid": worker.get("pid"),
- "active": worker.get("active"),
- "processed": worker.get("processed"),
- "status": worker.get("status")
- }
- for worker in workers_data
- ]
- @router.post("/browser_config", tags=["worker"])
- async def update_browser_config(new_config: Browser):
- """更新浏览器配置(包含自动路径校验)"""
- exe_path = new_config.exe_path
- # 自动处理路径格式
- if exe_path.startswith('"') and exe_path.endswith('"'):
- exe_path = exe_path.strip('"')
-
- path = Path(exe_path)
- try:
- if not path.exists():
- return JSONResponse(status_code=400, content={
- "status": "error",
- "message": "路径不存在",
- "detail": "请检查浏览器可执行文件路径是否正确"
- })
-
- if not path.is_file():
- return JSONResponse(status_code=400, content={
- "status": "error",
- "message": "路径不是文件",
- "detail": "请选择一个可执行文件而不是目录"
- })
-
- if not os.access(path, os.X_OK):
- return JSONResponse(status_code=400, content={
- "status": "error",
- "message": "文件不可执行",
- "detail": "请检查文件权限或是否损坏"
- })
-
- if path.suffix.lower() != '.exe':
- return JSONResponse(status_code=400, content={
- "status": "error",
- "message": "仅支持可执行文件 (.exe)",
- "detail": "请选择扩展名为.exe的浏览器可执行文件"
- })
- new_config.exe_path = str(path)
- # 校验通过后保存配置
- config.browser = new_config
- config.save()
- return {"status": "success", "browser_config": config.browser}
-
- except Exception as e:
- logger.error(f"浏览器路径验证失败: {str(e)}")
- return JSONResponse(status_code=500, content={
- "status": "error",
- "message": f"路径验证异常: {str(e)}"
- })
- @router.post("/reset_browser_config", tags=["worker"])
- async def reset_browser_config():
- """重置浏览器配置为默认"""
- config.browser = Browser()
- config.save()
- return {"status": "success", "browser_config": config.browser}
- @router.get("/status", tags=["worker"])
- async def status() -> ResponseStatus:
- global config,celery_worker
- try:
- health = await health_check()
- except Exception as e:
- logger.error(e)
- health = {"err": 1, "msg": str(e)}
- celery_status = await celery_worker.check_worker_status()
-
- # 数据转换处理
- if celery_status.get("workers") and celery_status["workers"].get("data"):
- celery_status["workers"]["data"] = transform_workers_data(
- celery_status["workers"]["data"]
- )
-
- logger.info(f"Filtered celery_status: {celery_status}")
- return ResponseStatus(
- endpoint=Endpoint(health=health),
- browser_config=config.browser,
- celery_status=celery_status
- )
- class StartupRequest(BaseModel):
- worker_name: str
- action: str
- data: Optional[Dict] = {}
- @router.post("/ctrl", tags=["worker"])
- async def start_worker(request: StartupRequest):
- global celery_worker
- if request.action == "start":
- await celery_worker.start_worker(request.worker_name)
- elif request.action == "stop":
- await celery_worker.stop_worker(request.worker_name)
- elif request.action == "clean":
- logger.info(f"clean {request.data}")
- else:
- raise HTTPException(status_code=400, detail=f"Invalid action: {request.action}")
- logger.info(f"{request.action} {request.worker_name}")
- flower_workers = await celery_worker.check_worker_status()
-
- # 保持数据转换的一致性
- if flower_workers.get("workers") and flower_workers["workers"].get("data"):
- flower_workers["workers"]["data"] = transform_workers_data(
- flower_workers["workers"]["data"]
- )
-
- if flower_workers.get("err") == 1:
- raise HTTPException(status_code=500, detail=f"Flower workers error: {flower_workers.get('msg')}")
- return {"status": "success", "flower_workers": flower_workers}
- async def health_check():
- """健康检查"""
- async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client:
- response = await client.get("/health")
- response.raise_for_status()
- ret = response.json()
- ret.update({"err": 0})
- return ret
|