worker.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. from fastapi import APIRouter, HTTPException, Request, Depends
  2. from fastapi.responses import JSONResponse, StreamingResponse
  3. import httpx
  4. import os
  5. from pathlib import Path
  6. from typing import Dict
  7. from pydantic import BaseModel
  8. from typing import Optional
  9. import pathlib
  10. import os
  11. from utils.config import WORKER_SERVICE_URL,config,Browser
  12. from utils.logu import logger
  13. router = APIRouter()
  14. class Endpoint(BaseModel):
  15. service_url: str = WORKER_SERVICE_URL
  16. token: Optional[str] = "temp_token_123"
  17. health: Optional[Dict] = {}
  18. class BrowserPathValidationRequest(BaseModel):
  19. exe_path: str
  20. class PathValidationRequest(BaseModel):
  21. file_path: str
  22. class PathValidationResponse(BaseModel):
  23. valid: bool
  24. message: str
  25. file_name: Optional[str] = None
  26. file_size: Optional[int] = None
  27. class ResponseStatus(BaseModel):
  28. endpoint: Endpoint = Endpoint()
  29. browser_config: Optional[Browser] = None
  30. @router.post("/browser_config", tags=["worker"])
  31. async def update_browser_config(new_config: Browser):
  32. """更新浏览器配置"""
  33. config.browser = new_config
  34. config.save()
  35. return {"status": "success", "browser_config": config.browser}
  36. @router.post("/reset_browser_config", tags=["worker"])
  37. async def reset_browser_config():
  38. """重置浏览器配置为默认"""
  39. config.browser = Browser()
  40. config.save()
  41. return {"status": "success", "browser_config": config.browser}
  42. @router.get("/status", tags=["worker"])
  43. async def status():
  44. global config
  45. try:
  46. health = await health_check()
  47. except Exception as e:
  48. logger.error(e)
  49. health = {"err": 1, "msg": str(e)}
  50. """获取当前请求的端点"""
  51. return ResponseStatus(endpoint=Endpoint(health=health), browser_config=config.browser)
  52. @router.post("/validate_browser_path", tags=["worker"])
  53. async def validate_browser_path(request: BrowserPathValidationRequest):
  54. """验证浏览器路径是否有效"""
  55. path = Path(request.exe_path)
  56. try:
  57. if not path.exists():
  58. return JSONResponse(status_code=400, content={
  59. "valid": False,
  60. "message": "路径不存在"
  61. })
  62. if not path.is_file():
  63. return JSONResponse(status_code=400, content={
  64. "valid": False,
  65. "message": "路径不是文件"
  66. })
  67. if not os.access(path, os.X_OK):
  68. return JSONResponse(status_code=400, content={
  69. "valid": False,
  70. "message": "文件不可执行"
  71. })
  72. if path.suffix.lower() != '.exe':
  73. return JSONResponse(status_code=400, content={
  74. "valid": False,
  75. "message": "仅支持可执行文件 (.exe)"
  76. })
  77. return {
  78. "valid": True,
  79. "message": "浏览器路径有效"
  80. }
  81. except Exception as e:
  82. logger.error(f"浏览器路径验证失败: {str(e)}")
  83. return JSONResponse(status_code=500, content={
  84. "valid": False,
  85. "message": f"路径验证异常: {str(e)}"
  86. })
  87. @router.post("/validate_path", tags=["worker"])
  88. async def validate_file_path(request: PathValidationRequest):
  89. """验证文件路径是否有效"""
  90. path = Path(request.file_path)
  91. try:
  92. if not path.exists():
  93. return JSONResponse(status_code=400, content={
  94. "valid": False,
  95. "message": "路径不存在"
  96. })
  97. if not path.is_file():
  98. return JSONResponse(status_code=400, content={
  99. "valid": False,
  100. "message": "路径不是文件"
  101. })
  102. if not os.access(path, os.R_OK):
  103. return JSONResponse(status_code=400, content={
  104. "valid": False,
  105. "message": "文件不可读"
  106. })
  107. if path.suffix.lower() not in ('.xlsx', '.xls'):
  108. return JSONResponse(status_code=400, content={
  109. "valid": False,
  110. "message": "仅支持Excel文件 (.xlsx/.xls)"
  111. })
  112. return {
  113. "valid": True,
  114. "message": "文件有效",
  115. "file_name": path.name,
  116. "file_size": path.stat().st_size
  117. }
  118. except Exception as e:
  119. logger.error(f"路径验证失败: {str(e)}")
  120. return JSONResponse(status_code=500, content={
  121. "valid": False,
  122. "message": f"路径验证异常: {str(e)}"
  123. })
  124. async def health_check():
  125. """健康检查"""
  126. async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client:
  127. response = await client.get("/health")
  128. response.raise_for_status()
  129. ret = response.json()
  130. ret.update({"err": 0})
  131. return ret
  132. async def get_request_body(request: Request):
  133. """获取原始请求体"""
  134. return await request.body()
  135. @router.api_route("/fuck/{path:path}", methods=["GET", "POST", "PUT", "DELETE"], tags=["worker"])
  136. async def proxy_worker_requests(
  137. path: str,
  138. request: Request,
  139. body: bytes = Depends(get_request_body)
  140. ):
  141. """全局反向代理到搜索微服务"""
  142. async with httpx.AsyncClient(base_url=WORKER_SERVICE_URL) as client:
  143. try:
  144. # 构造目标请求头(过滤掉不需要的 headers)
  145. headers = {
  146. "Content-Type": request.headers.get("Content-Type", ""),
  147. "Accept": request.headers.get("Accept", "application/json")
  148. }
  149. # 转发请求到目标服务
  150. response = await client.request(
  151. request.method,
  152. f"/{path}",
  153. content=body,
  154. headers=headers,
  155. timeout=60
  156. )
  157. # 流式返回响应内容
  158. return StreamingResponse(
  159. response.aiter_bytes(),
  160. status_code=response.status_code,
  161. headers=dict(response.headers)
  162. )
  163. except httpx.ConnectError:
  164. raise HTTPException(status_code=502, detail="搜索服务不可用")
  165. except Exception as e:
  166. raise HTTPException(status_code=500, detail=str(e))