| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- import json
- from fastapi import APIRouter, HTTPException, Request, Depends
- from fastapi.responses import JSONResponse, StreamingResponse
- import httpx
- import os
- from pathlib import Path
- from typing import Dict, List
- from pydantic import BaseModel
- from typing import Optional
- import pathlib
- import os
- from utils.config import WORKER_SERVICE_URL,config,Browser
- from src.services.celery_worker import celery_worker
- from utils.logu import logger
- router = APIRouter()
- class Endpoint(BaseModel):
- service_url: str = WORKER_SERVICE_URL
- token: Optional[str] = "temp_token_123"
- health: Optional[Dict] = {}
- class PathValidationRequest(BaseModel):
- file_path: str
- class PathValidationResponse(BaseModel):
- valid: bool
- message: str
- file_name: Optional[str] = None
- file_size: Optional[int] = None
- class ResponseStatus(BaseModel):
- endpoint: Endpoint = Endpoint()
- browser_config: Optional[Browser] = None
- celery_status: Optional[Dict] = {}
-
- def transform_workers_data(workers_data: list) -> list:
- """转换worker数据为前端需要的格式"""
- return [
- {
- "hostname": worker.get("hostname"),
- "worker-online": worker.get("worker-online"),
- "pid": worker.get("pid"),
- "active": worker.get("active"),
- "processed": worker.get("processed"),
- "status": worker.get("status")
- }
- for worker in workers_data
- ]
- async def get_proxy_pool() -> Optional[list[str]]:
- """获取代理池数据"""
- try:
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"http://{config.backend.host}:{config.backend.port}/api/proxy/proxies-pool",
- params={"force_refresh": False}
- )
- response.raise_for_status()
- data = response.json()
- return data.get("proxies", None)
- except Exception as e:
- logger.error(f"获取代理池失败: {str(e)}")
- return None
- @router.post("/browser_config", tags=["worker"])
- async def update_browser_config(new_config: Browser):
- """更新浏览器配置(包含自动路径校验)"""
- exe_path = new_config.exe_path
- # 自动处理路径格式
- if exe_path.startswith('"') and exe_path.endswith('"'):
- exe_path = exe_path.strip('"')
-
- path = Path(exe_path)
- try:
- if not path.exists():
- return JSONResponse(status_code=400, content={
- "status": "error",
- "message": "路径不存在",
- "detail": "请检查浏览器可执行文件路径是否正确"
- })
-
- if not path.is_file():
- return JSONResponse(status_code=400, content={
- "status": "error",
- "message": "路径不是文件",
- "detail": "请选择一个可执行文件而不是目录"
- })
-
- if not os.access(path, os.X_OK):
- return JSONResponse(status_code=400, content={
- "status": "error",
- "message": "文件不可执行",
- "detail": "请检查文件权限或是否损坏"
- })
-
- if path.suffix.lower() != '.exe':
- return JSONResponse(status_code=400, content={
- "status": "error",
- "message": "仅支持可执行文件 (.exe)",
- "detail": "请选择扩展名为.exe的浏览器可执行文件"
- })
- new_config.exe_path = str(path)
- # 校验通过后保存配置
- config.browser = new_config
- config.save()
- return {"status": "success", "browser_config": config.browser}
-
- except Exception as e:
- logger.error(f"浏览器路径验证失败: {str(e)}")
- return JSONResponse(status_code=500, content={
- "status": "error",
- "message": f"路径验证异常: {str(e)}"
- })
- @router.post("/reset_browser_config", tags=["worker"])
- async def reset_browser_config():
- """重置浏览器配置为默认"""
- config.browser = Browser()
- config.save()
- return {"status": "success", "browser_config": config.browser}
- @router.get("/status", tags=["worker"])
- async def status() -> ResponseStatus:
- global config,celery_worker
- try:
- health = await health_check()
- except Exception as e:
- logger.error(e)
- health = {"err": 1, "msg": str(e)}
- celery_status = await celery_worker.check_status()
- queue_lengths = await celery_worker.get_queue_lengths()
-
- # 数据转换处理
- if celery_status.get("workers") and celery_status["workers"].get("data"):
- celery_status["workers"]["data"] = transform_workers_data(
- celery_status["workers"]["data"]
- )
-
- # 添加队列长度信息
- celery_status["queue_lengths"] = queue_lengths
-
- return ResponseStatus(
- endpoint=Endpoint(health=health),
- browser_config=config.browser,
- celery_status=celery_status
- )
- class StartupRequest(BaseModel):
- worker_name: str
- action: str
- select_proxy: Optional[str] = ''
- data: Optional[Dict] = {}
- @router.post("/ctrl", tags=["worker"])
- async def ctrl_worker(request: StartupRequest):
- global celery_worker
- logger.info(f"{request.model_dump_json(indent=4)}")
- if request.action == "start":
- worker_model = await celery_worker.start_worker(request.worker_name, in_cmd_windows=True)
- elif request.action == "stop":
- worker_model = await celery_worker.stop_worker(request.worker_name)
- elif request.action == "clean":
- await celery_worker.clear_specific_queue(request.worker_name)
- logger.info(f"clean {request.data}")
- return {"status": "success"}
- elif request.action == "submit":
- res = await celery_worker.submit_tasks(
- request.worker_name,
- request.data,
- select_proxy=request.select_proxy
- )
- return {"status": "success", "res": res}
- elif request.action == "submit_all":
- res = await celery_worker.submit_all_tasks(
- request.worker_name,
- request.data,
- select_proxy=request.select_proxy
- )
- return {"status": "success", "res": res}
- else:
- logger.error(f"Invalid action: {request.action}")
- raise HTTPException(status_code=400, detail=f"Invalid action: {request.action}")
- return {"status": "success", "worker_model": worker_model}
- async def health_check():
- """健康检查"""
- async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client:
- response = await client.get("/health")
- response.raise_for_status()
- ret = response.json()
- ret.update({"err": 0})
- return ret
- @router.delete("/keywords/{keyword}", summary="删除关键词")
- async def delete_keyword(keyword: str):
- '''
- DELETE: API_BASE_URL/worker/keywords/{keyword}
- '''
- async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client:
- # 发送 DELETE 请求
- logger.info(f"DELETE: {WORKER_SERVICE_URL}/keywords/{keyword}")
- response = await client.delete(f"/keywords/{keyword}")
-
- # 检查响应状态码,如果请求失败则抛出异常
- response.raise_for_status()
- ret = response.json()
- logger.info(f"response {ret}")
- if ret.get("err") == 0:
- return JSONResponse(
- status_code=200,
- content={
- "err": 0,
- "status": "success",
- "message": f"关键词 '{keyword}' 删除成功",
- "model": ret
- }
- )
- elif ret.get("err") == 1:
- return JSONResponse(
- status_code=404,
- content={
- "err": 1,
- "status": "error",
- "message": f"关键词 '{keyword}' 不存在"
- }
- )
- return JSONResponse(
- status_code=500,
- content={
- "err": ret.get("err", 2),
- "status": "error",
- "message": ret.get("message", "服务器内部错误")
- }
- )
|