worker.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. import json
  2. from fastapi import APIRouter, HTTPException, Request, Depends
  3. from fastapi.responses import JSONResponse, StreamingResponse
  4. import httpx
  5. import os
  6. from pathlib import Path
  7. from typing import Dict, List
  8. from pydantic import BaseModel
  9. from typing import Optional
  10. import pathlib
  11. import os
  12. from utils.config import WORKER_SERVICE_URL,config,Browser
  13. from src.services.celery_worker import celery_worker
  14. from utils.logu import logger
  15. router = APIRouter()
  16. class Endpoint(BaseModel):
  17. service_url: str = WORKER_SERVICE_URL
  18. token: Optional[str] = "temp_token_123"
  19. health: Optional[Dict] = {}
  20. class PathValidationRequest(BaseModel):
  21. file_path: str
  22. class PathValidationResponse(BaseModel):
  23. valid: bool
  24. message: str
  25. file_name: Optional[str] = None
  26. file_size: Optional[int] = None
  27. class ResponseStatus(BaseModel):
  28. endpoint: Endpoint = Endpoint()
  29. browser_config: Optional[Browser] = None
  30. celery_status: Optional[Dict] = {}
  31. def transform_workers_data(workers_data: list) -> list:
  32. """转换worker数据为前端需要的格式"""
  33. return [
  34. {
  35. "hostname": worker.get("hostname"),
  36. "worker-online": worker.get("worker-online"),
  37. "pid": worker.get("pid"),
  38. "active": worker.get("active"),
  39. "processed": worker.get("processed"),
  40. "status": worker.get("status")
  41. }
  42. for worker in workers_data
  43. ]
  44. async def get_proxy_pool() -> Optional[list[str]]:
  45. """获取代理池数据"""
  46. try:
  47. async with httpx.AsyncClient() as client:
  48. response = await client.get(
  49. f"http://{config.backend.host}:{config.backend.port}/api/proxy/proxies-pool",
  50. params={"force_refresh": False}
  51. )
  52. response.raise_for_status()
  53. data = response.json()
  54. return data.get("proxies", None)
  55. except Exception as e:
  56. logger.error(f"获取代理池失败: {str(e)}")
  57. return None
  58. @router.post("/browser_config", tags=["worker"])
  59. async def update_browser_config(new_config: Browser):
  60. """更新浏览器配置(包含自动路径校验)"""
  61. exe_path = new_config.exe_path
  62. # 自动处理路径格式
  63. if exe_path.startswith('"') and exe_path.endswith('"'):
  64. exe_path = exe_path.strip('"')
  65. path = Path(exe_path)
  66. try:
  67. if not path.exists():
  68. return JSONResponse(status_code=400, content={
  69. "status": "error",
  70. "message": "路径不存在",
  71. "detail": "请检查浏览器可执行文件路径是否正确"
  72. })
  73. if not path.is_file():
  74. return JSONResponse(status_code=400, content={
  75. "status": "error",
  76. "message": "路径不是文件",
  77. "detail": "请选择一个可执行文件而不是目录"
  78. })
  79. if not os.access(path, os.X_OK):
  80. return JSONResponse(status_code=400, content={
  81. "status": "error",
  82. "message": "文件不可执行",
  83. "detail": "请检查文件权限或是否损坏"
  84. })
  85. if path.suffix.lower() != '.exe':
  86. return JSONResponse(status_code=400, content={
  87. "status": "error",
  88. "message": "仅支持可执行文件 (.exe)",
  89. "detail": "请选择扩展名为.exe的浏览器可执行文件"
  90. })
  91. new_config.exe_path = str(path)
  92. # 校验通过后保存配置
  93. config.browser = new_config
  94. config.save()
  95. return {"status": "success", "browser_config": config.browser}
  96. except Exception as e:
  97. logger.error(f"浏览器路径验证失败: {str(e)}")
  98. return JSONResponse(status_code=500, content={
  99. "status": "error",
  100. "message": f"路径验证异常: {str(e)}"
  101. })
  102. @router.post("/reset_browser_config", tags=["worker"])
  103. async def reset_browser_config():
  104. """重置浏览器配置为默认"""
  105. config.browser = Browser()
  106. config.save()
  107. return {"status": "success", "browser_config": config.browser}
  108. @router.get("/status", tags=["worker"])
  109. async def status() -> ResponseStatus:
  110. global config,celery_worker
  111. try:
  112. health = await health_check()
  113. except Exception as e:
  114. logger.error(e)
  115. health = {"err": 1, "msg": str(e)}
  116. celery_status = await celery_worker.check_status()
  117. queue_lengths = await celery_worker.get_queue_lengths()
  118. # 数据转换处理
  119. if celery_status.get("workers") and celery_status["workers"].get("data"):
  120. celery_status["workers"]["data"] = transform_workers_data(
  121. celery_status["workers"]["data"]
  122. )
  123. # 添加队列长度信息
  124. celery_status["queue_lengths"] = queue_lengths
  125. return ResponseStatus(
  126. endpoint=Endpoint(health=health),
  127. browser_config=config.browser,
  128. celery_status=celery_status
  129. )
  130. class StartupRequest(BaseModel):
  131. worker_name: str
  132. action: str
  133. select_proxy: Optional[str] = ''
  134. data: Optional[Dict] = {}
  135. @router.post("/ctrl", tags=["worker"])
  136. async def ctrl_worker(request: StartupRequest):
  137. global celery_worker
  138. logger.info(f"{request.model_dump_json(indent=4)}")
  139. if request.action == "start":
  140. worker_model = await celery_worker.start_worker(request.worker_name, in_cmd_windows=True)
  141. elif request.action == "stop":
  142. worker_model = await celery_worker.stop_worker(request.worker_name)
  143. elif request.action == "clean":
  144. await celery_worker.clear_specific_queue(request.worker_name)
  145. logger.info(f"clean {request.data}")
  146. return {"status": "success"}
  147. elif request.action == "submit":
  148. res = await celery_worker.submit_tasks(
  149. request.worker_name,
  150. request.data,
  151. select_proxy=request.select_proxy
  152. )
  153. return {"status": "success", "res": res}
  154. elif request.action == "submit_all":
  155. res = await celery_worker.submit_all_tasks(
  156. request.worker_name,
  157. request.data,
  158. select_proxy=request.select_proxy
  159. )
  160. return {"status": "success", "res": res}
  161. else:
  162. logger.error(f"Invalid action: {request.action}")
  163. raise HTTPException(status_code=400, detail=f"Invalid action: {request.action}")
  164. return {"status": "success", "worker_model": worker_model}
  165. async def health_check():
  166. """健康检查"""
  167. async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client:
  168. response = await client.get("/health")
  169. response.raise_for_status()
  170. ret = response.json()
  171. ret.update({"err": 0})
  172. return ret
  173. @router.delete("/keywords/{keyword}", summary="删除关键词")
  174. async def delete_keyword(keyword: str):
  175. '''
  176. DELETE: API_BASE_URL/worker/keywords/{keyword}
  177. '''
  178. async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client:
  179. # 发送 DELETE 请求
  180. logger.info(f"DELETE: {WORKER_SERVICE_URL}/keywords/{keyword}")
  181. response = await client.delete(f"/keywords/{keyword}")
  182. # 检查响应状态码,如果请求失败则抛出异常
  183. response.raise_for_status()
  184. ret = response.json()
  185. logger.info(f"response {ret}")
  186. if ret.get("err") == 0:
  187. return JSONResponse(
  188. status_code=200,
  189. content={
  190. "err": 0,
  191. "status": "success",
  192. "message": f"关键词 '{keyword}' 删除成功",
  193. "model": ret
  194. }
  195. )
  196. elif ret.get("err") == 1:
  197. return JSONResponse(
  198. status_code=404,
  199. content={
  200. "err": 1,
  201. "status": "error",
  202. "message": f"关键词 '{keyword}' 不存在"
  203. }
  204. )
  205. return JSONResponse(
  206. status_code=500,
  207. content={
  208. "err": ret.get("err", 2),
  209. "status": "error",
  210. "message": ret.get("message", "服务器内部错误")
  211. }
  212. )