worker.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import subprocess
  2. from fastapi import APIRouter, HTTPException, Request, Depends
  3. from fastapi.responses import JSONResponse, StreamingResponse
  4. import httpx
  5. import os
  6. from pathlib import Path
  7. from typing import Dict
  8. from pydantic import BaseModel
  9. from typing import Optional
  10. import pathlib
  11. import os
  12. from utils.config import WORKER_SERVICE_URL,config,Browser
  13. from src.services.celery_worker import celery_worker
  14. from utils.logu import logger
  15. router = APIRouter()
  16. class Endpoint(BaseModel):
  17. service_url: str = WORKER_SERVICE_URL
  18. token: Optional[str] = "temp_token_123"
  19. health: Optional[Dict] = {}
  20. class PathValidationRequest(BaseModel):
  21. file_path: str
  22. class PathValidationResponse(BaseModel):
  23. valid: bool
  24. message: str
  25. file_name: Optional[str] = None
  26. file_size: Optional[int] = None
  27. class ResponseStatus(BaseModel):
  28. endpoint: Endpoint = Endpoint()
  29. browser_config: Optional[Browser] = None
  30. celery_status: Optional[Dict] = {}
  31. def transform_workers_data(workers_data: list) -> list:
  32. """转换worker数据为前端需要的格式"""
  33. return [
  34. {
  35. "hostname": worker.get("hostname"),
  36. "worker-online": worker.get("worker-online"),
  37. "pid": worker.get("pid"),
  38. "active": worker.get("active"),
  39. "processed": worker.get("processed"),
  40. "status": worker.get("status")
  41. }
  42. for worker in workers_data
  43. ]
  44. @router.post("/browser_config", tags=["worker"])
  45. async def update_browser_config(new_config: Browser):
  46. """更新浏览器配置(包含自动路径校验)"""
  47. exe_path = new_config.exe_path
  48. # 自动处理路径格式
  49. if exe_path.startswith('"') and exe_path.endswith('"'):
  50. exe_path = exe_path.strip('"')
  51. path = Path(exe_path)
  52. try:
  53. if not path.exists():
  54. return JSONResponse(status_code=400, content={
  55. "status": "error",
  56. "message": "路径不存在",
  57. "detail": "请检查浏览器可执行文件路径是否正确"
  58. })
  59. if not path.is_file():
  60. return JSONResponse(status_code=400, content={
  61. "status": "error",
  62. "message": "路径不是文件",
  63. "detail": "请选择一个可执行文件而不是目录"
  64. })
  65. if not os.access(path, os.X_OK):
  66. return JSONResponse(status_code=400, content={
  67. "status": "error",
  68. "message": "文件不可执行",
  69. "detail": "请检查文件权限或是否损坏"
  70. })
  71. if path.suffix.lower() != '.exe':
  72. return JSONResponse(status_code=400, content={
  73. "status": "error",
  74. "message": "仅支持可执行文件 (.exe)",
  75. "detail": "请选择扩展名为.exe的浏览器可执行文件"
  76. })
  77. new_config.exe_path = str(path)
  78. # 校验通过后保存配置
  79. config.browser = new_config
  80. config.save()
  81. return {"status": "success", "browser_config": config.browser}
  82. except Exception as e:
  83. logger.error(f"浏览器路径验证失败: {str(e)}")
  84. return JSONResponse(status_code=500, content={
  85. "status": "error",
  86. "message": f"路径验证异常: {str(e)}"
  87. })
  88. @router.post("/reset_browser_config", tags=["worker"])
  89. async def reset_browser_config():
  90. """重置浏览器配置为默认"""
  91. config.browser = Browser()
  92. config.save()
  93. return {"status": "success", "browser_config": config.browser}
  94. @router.get("/status", tags=["worker"])
  95. async def status() -> ResponseStatus:
  96. global config,celery_worker
  97. try:
  98. health = await health_check()
  99. except Exception as e:
  100. logger.error(e)
  101. health = {"err": 1, "msg": str(e)}
  102. celery_status = await celery_worker.check_worker_status()
  103. # 数据转换处理
  104. if celery_status.get("workers") and celery_status["workers"].get("data"):
  105. celery_status["workers"]["data"] = transform_workers_data(
  106. celery_status["workers"]["data"]
  107. )
  108. logger.info(f"Filtered celery_status: {celery_status}")
  109. return ResponseStatus(
  110. endpoint=Endpoint(health=health),
  111. browser_config=config.browser,
  112. celery_status=celery_status
  113. )
  114. class StartupRequest(BaseModel):
  115. worker_name: str
  116. action: str
  117. data: Optional[Dict] = {}
  118. @router.post("/ctrl", tags=["worker"])
  119. async def start_worker(request: StartupRequest):
  120. global celery_worker
  121. if request.action == "start":
  122. await celery_worker.start_worker(request.worker_name)
  123. elif request.action == "stop":
  124. await celery_worker.stop_worker(request.worker_name)
  125. elif request.action == "clean":
  126. logger.info(f"clean {request.data}")
  127. else:
  128. raise HTTPException(status_code=400, detail=f"Invalid action: {request.action}")
  129. logger.info(f"{request.action} {request.worker_name}")
  130. flower_workers = await celery_worker.check_worker_status()
  131. # 保持数据转换的一致性
  132. if flower_workers.get("workers") and flower_workers["workers"].get("data"):
  133. flower_workers["workers"]["data"] = transform_workers_data(
  134. flower_workers["workers"]["data"]
  135. )
  136. if flower_workers.get("err") == 1:
  137. raise HTTPException(status_code=500, detail=f"Flower workers error: {flower_workers.get('msg')}")
  138. return {"status": "success", "flower_workers": flower_workers}
  139. async def health_check():
  140. """健康检查"""
  141. async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client:
  142. response = await client.get("/health")
  143. response.raise_for_status()
  144. ret = response.json()
  145. ret.update({"err": 0})
  146. return ret