import json from fastapi import APIRouter, HTTPException, Request, Depends from fastapi.responses import JSONResponse, StreamingResponse import httpx import os from pathlib import Path from typing import Dict, List from pydantic import BaseModel from typing import Optional import pathlib import os from utils.config import WORKER_SERVICE_URL,config,Browser from src.services.celery_worker import celery_worker from utils.logu import logger router = APIRouter() class Endpoint(BaseModel): service_url: str = WORKER_SERVICE_URL token: Optional[str] = "temp_token_123" health: Optional[Dict] = {} class PathValidationRequest(BaseModel): file_path: str class PathValidationResponse(BaseModel): valid: bool message: str file_name: Optional[str] = None file_size: Optional[int] = None class ResponseStatus(BaseModel): endpoint: Endpoint = Endpoint() browser_config: Optional[Browser] = None celery_status: Optional[Dict] = {} def transform_workers_data(workers_data: list) -> list: """转换worker数据为前端需要的格式""" return [ { "hostname": worker.get("hostname"), "worker-online": worker.get("worker-online"), "pid": worker.get("pid"), "active": worker.get("active"), "processed": worker.get("processed"), "status": worker.get("status") } for worker in workers_data ] async def get_proxy_pool() -> Optional[list[str]]: """获取代理池数据""" try: async with httpx.AsyncClient() as client: response = await client.get( f"http://{config.backend.host}:{config.backend.port}/api/proxy/proxies-pool", params={"force_refresh": False} ) response.raise_for_status() data = response.json() return data.get("proxies", None) except Exception as e: logger.error(f"获取代理池失败: {str(e)}") return None @router.post("/browser_config", tags=["worker"]) async def update_browser_config(new_config: Browser): """更新浏览器配置(包含自动路径校验)""" exe_path = new_config.exe_path # 自动处理路径格式 if exe_path.startswith('"') and exe_path.endswith('"'): exe_path = exe_path.strip('"') path = Path(exe_path) try: if not path.exists(): return JSONResponse(status_code=400, content={ "status": "error", "message": "路径不存在", "detail": "请检查浏览器可执行文件路径是否正确" }) if not path.is_file(): return JSONResponse(status_code=400, content={ "status": "error", "message": "路径不是文件", "detail": "请选择一个可执行文件而不是目录" }) if not os.access(path, os.X_OK): return JSONResponse(status_code=400, content={ "status": "error", "message": "文件不可执行", "detail": "请检查文件权限或是否损坏" }) if path.suffix.lower() != '.exe': return JSONResponse(status_code=400, content={ "status": "error", "message": "仅支持可执行文件 (.exe)", "detail": "请选择扩展名为.exe的浏览器可执行文件" }) new_config.exe_path = str(path) # 校验通过后保存配置 config.browser = new_config config.save() return {"status": "success", "browser_config": config.browser} except Exception as e: logger.error(f"浏览器路径验证失败: {str(e)}") return JSONResponse(status_code=500, content={ "status": "error", "message": f"路径验证异常: {str(e)}" }) @router.post("/reset_browser_config", tags=["worker"]) async def reset_browser_config(): """重置浏览器配置为默认""" config.browser = Browser() config.save() return {"status": "success", "browser_config": config.browser} @router.get("/status", tags=["worker"]) async def status() -> ResponseStatus: global config,celery_worker try: health = await health_check() except Exception as e: logger.error(e) health = {"err": 1, "msg": str(e)} celery_status = await celery_worker.check_status() queue_lengths = await celery_worker.get_queue_lengths() # 数据转换处理 if celery_status.get("workers") and celery_status["workers"].get("data"): celery_status["workers"]["data"] = transform_workers_data( celery_status["workers"]["data"] ) # 添加队列长度信息 celery_status["queue_lengths"] = queue_lengths return ResponseStatus( endpoint=Endpoint(health=health), browser_config=config.browser, celery_status=celery_status ) class StartupRequest(BaseModel): worker_name: str action: str select_proxy: Optional[str] = '' data: Optional[Dict] = {} @router.post("/ctrl", tags=["worker"]) async def ctrl_worker(request: StartupRequest): global celery_worker logger.info(f"{request.model_dump_json(indent=4)}") if request.action == "start": worker_model = await celery_worker.start_worker(request.worker_name, in_cmd_windows=True) elif request.action == "stop": worker_model = await celery_worker.stop_worker(request.worker_name) elif request.action == "clean": await celery_worker.clear_specific_queue(request.worker_name) logger.info(f"clean {request.data}") return {"status": "success"} elif request.action == "submit": res = await celery_worker.submit_tasks( request.worker_name, request.data, select_proxy=request.select_proxy ) return {"status": "success", "res": res} elif request.action == "submit_all": res = await celery_worker.submit_all_tasks( request.worker_name, request.data, select_proxy=request.select_proxy ) return {"status": "success", "res": res} else: logger.error(f"Invalid action: {request.action}") raise HTTPException(status_code=400, detail=f"Invalid action: {request.action}") return {"status": "success", "worker_model": worker_model} async def health_check(): """健康检查""" async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client: response = await client.get("/health") response.raise_for_status() ret = response.json() ret.update({"err": 0}) return ret @router.delete("/keywords/{keyword}", summary="删除关键词") async def delete_keyword(keyword: str): ''' DELETE: API_BASE_URL/worker/keywords/{keyword} ''' async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client: # 发送 DELETE 请求 logger.info(f"DELETE: {WORKER_SERVICE_URL}/keywords/{keyword}") response = await client.delete(f"/keywords/{keyword}") # 检查响应状态码,如果请求失败则抛出异常 response.raise_for_status() ret = response.json() logger.info(f"response {ret}") if ret.get("err") == 0: return JSONResponse( status_code=200, content={ "err": 0, "status": "success", "message": f"关键词 '{keyword}' 删除成功", "model": ret } ) elif ret.get("err") == 1: return JSONResponse( status_code=404, content={ "err": 1, "status": "error", "message": f"关键词 '{keyword}' 不存在" } ) return JSONResponse( status_code=500, content={ "err": ret.get("err", 2), "status": "error", "message": ret.get("message", "服务器内部错误") } )