worker.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. from fastapi import APIRouter, HTTPException, Request, Depends
  2. from fastapi.responses import JSONResponse, StreamingResponse
  3. import httpx
  4. import os
  5. from pathlib import Path
  6. from typing import Dict
  7. from pydantic import BaseModel
  8. from typing import Optional
  9. import pathlib
  10. import os
  11. from utils.config import WORKER_SERVICE_URL,config,Browser
  12. from src.services.celery_worker import CeleryWorker
  13. from utils.logu import logger
  14. router = APIRouter()
  15. class Endpoint(BaseModel):
  16. service_url: str = WORKER_SERVICE_URL
  17. token: Optional[str] = "temp_token_123"
  18. health: Optional[Dict] = {}
  19. class PathValidationRequest(BaseModel):
  20. file_path: str
  21. class PathValidationResponse(BaseModel):
  22. valid: bool
  23. message: str
  24. file_name: Optional[str] = None
  25. file_size: Optional[int] = None
  26. class ResponseStatus(BaseModel):
  27. endpoint: Endpoint = Endpoint()
  28. browser_config: Optional[Browser] = None
  29. celery_status: Optional[Dict] = {}
  30. @router.post("/browser_config", tags=["worker"])
  31. async def update_browser_config(new_config: Browser):
  32. """更新浏览器配置(包含自动路径校验)"""
  33. exe_path = new_config.exe_path
  34. # 自动处理路径格式
  35. if exe_path.startswith('"') and exe_path.endswith('"'):
  36. exe_path = exe_path.strip('"')
  37. path = Path(exe_path)
  38. try:
  39. if not path.exists():
  40. return JSONResponse(status_code=400, content={
  41. "status": "error",
  42. "message": "路径不存在",
  43. "detail": "请检查浏览器可执行文件路径是否正确"
  44. })
  45. if not path.is_file():
  46. return JSONResponse(status_code=400, content={
  47. "status": "error",
  48. "message": "路径不是文件",
  49. "detail": "请选择一个可执行文件而不是目录"
  50. })
  51. if not os.access(path, os.X_OK):
  52. return JSONResponse(status_code=400, content={
  53. "status": "error",
  54. "message": "文件不可执行",
  55. "detail": "请检查文件权限或是否损坏"
  56. })
  57. if path.suffix.lower() != '.exe':
  58. return JSONResponse(status_code=400, content={
  59. "status": "error",
  60. "message": "仅支持可执行文件 (.exe)",
  61. "detail": "请选择扩展名为.exe的浏览器可执行文件"
  62. })
  63. new_config.exe_path = str(path)
  64. # 校验通过后保存配置
  65. config.browser = new_config
  66. config.save()
  67. return {"status": "success", "browser_config": config.browser}
  68. except Exception as e:
  69. logger.error(f"浏览器路径验证失败: {str(e)}")
  70. return JSONResponse(status_code=500, content={
  71. "status": "error",
  72. "message": f"路径验证异常: {str(e)}"
  73. })
  74. @router.post("/reset_browser_config", tags=["worker"])
  75. async def reset_browser_config():
  76. """重置浏览器配置为默认"""
  77. config.browser = Browser()
  78. config.save()
  79. return {"status": "success", "browser_config": config.browser}
  80. @router.get("/status", tags=["worker"])
  81. async def status():
  82. global config
  83. try:
  84. health = await health_check()
  85. except Exception as e:
  86. logger.error(e)
  87. health = {"err": 1, "msg": str(e)}
  88. celery_worker = CeleryWorker()
  89. celery_status = await celery_worker.check_worker_status()
  90. """获取当前请求的端点"""
  91. return ResponseStatus(endpoint=Endpoint(health=health), browser_config=config.browser, celery_status=celery_status)
  92. async def health_check():
  93. """健康检查"""
  94. async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client:
  95. response = await client.get("/health")
  96. response.raise_for_status()
  97. ret = response.json()
  98. ret.update({"err": 0})
  99. return ret