mihomo.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. import asyncio
  2. import os
  3. import subprocess
  4. import hashlib
  5. from pathlib import Path
  6. from fastapi import APIRouter, HTTPException
  7. from datetime import datetime, timedelta
  8. from pydantic import BaseModel
  9. from typing import Dict, List, Optional, Union
  10. import httpx
  11. from sqlmodel import Session, select, or_,func, update
  12. import yaml
  13. import signal
  14. from asyncio import subprocess
  15. import multiprocessing
  16. from multiprocessing import Pipe, Process
  17. from config.logu import logger, get_logger
  18. from config.settings import settings
  19. from routers.subscriptions import list_subscriptions,SubscriptionResponse
  20. from utils.mihomo_service import port_is_using,find_free_port
  21. from utils.sub import update_config
  22. from utils.processes_mgr import process_manager
  23. from database.models.subscription import SubscriptionManager,SubscriptFile,MihomoMeta
  24. POOL = None
  25. mihomo_router = APIRouter()
  26. class MihomoBatchRequest(BaseModel):
  27. id: int
  28. port: Optional[int] = None
  29. class MihomoRunningStatus(MihomoBatchRequest):
  30. pid: int
  31. started_at: datetime
  32. class ProcessInfo(BaseModel):
  33. provider_name: str
  34. class MihomoResponse(MihomoBatchRequest):
  35. error: int = 0
  36. detail: Optional[Dict] = None
  37. class MihomoMetaWithURL(MihomoMeta, table=False):
  38. subscript_file: Optional[int] = None
  39. external_controller_url: Optional[str] = None
  40. error: Optional[str] = None # 新增错误信息字段
  41. detail: Optional[dict] = None
  42. class Config:
  43. arbitrary_types_allowed = True
  44. class ProxiesReachabilityResponse(BaseModel):
  45. provider_name: str
  46. delays: Optional[Dict[str, int]] = None
  47. error: Optional[str] = None
  48. external_controller_url: Optional[str] = None
  49. async def request_select_proxy_name(external_ctl: str, provider_name: str, proxy_name: str, max_retries: int = 5, delay: float = 2.0) -> Optional[dict]:
  50. url = f"http://{external_ctl}/proxies/{provider_name}"
  51. payload = {"name": proxy_name}
  52. async with httpx.AsyncClient() as client:
  53. for attempt in range(max_retries):
  54. try:
  55. response = await client.put(url, json=payload)
  56. response.raise_for_status()
  57. return response
  58. except (httpx.HTTPError, httpx.RequestError) as e:
  59. if attempt < max_retries - 1:
  60. await asyncio.sleep(delay)
  61. else:
  62. raise HTTPException(status_code=500, detail=f"Failed to select proxy after {max_retries} attempts: {str(e)}")
  63. async def async_proxy_delay(provider_name: str, external_controller: str) -> dict:
  64. """异步获取代理延迟
  65. 失败: {'message': 'get delay: all proxies timeout'}
  66. 成功: {'自动选择': 34, '🇦🇺澳大利亚悉尼': 159, '🇦🇺澳大利亚悉尼2': 1577,...}
  67. """
  68. url = f"http://{external_controller}/group/{provider_name}/delay?url=https%3A%2F%2Fwww.gstatic.com%2Fgenerate_204&timeout=2000"
  69. async with httpx.AsyncClient() as client:
  70. try:
  71. response = await client.get(url, timeout=30)
  72. response.raise_for_status()
  73. return response.json()
  74. except Exception as e:
  75. return {"error": str(e)}
  76. @mihomo_router.post("/start")
  77. async def post_start_mihomo(request: MihomoBatchRequest) -> MihomoMetaWithURL :
  78. db = SubscriptionManager()
  79. logger.info(f"{request}")
  80. with Session(db.engine) as session:
  81. miho_model = session.exec(
  82. select(MihomoMeta)
  83. .where(MihomoMeta.id == request.id)
  84. ).first()
  85. if not miho_model:
  86. return MihomoMetaWithURL(error=1, detail="mihomo not found")
  87. sub_file = miho_model.subscript_file
  88. if miho_model.pid:
  89. return miho_model
  90. mixed_port = request.port
  91. if not mixed_port:
  92. mixed_port = await find_free_port()
  93. external_controller_port = await find_free_port((mixed_port+1, 18000))
  94. config = {}
  95. temp_path = settings.MIHOMO_TEMP_PATH / f"{miho_model.provider_name}_{external_controller_port}.yaml"
  96. config['mixed-port'] = mixed_port
  97. config['external-controller'] = f'127.0.0.1:{external_controller_port}'
  98. config['bind-address'] = '127.0.0.1'
  99. # logger.info(f"sub_file.file_path {sub_file.file_path}")
  100. # logger.info(f"temp_path {temp_path}")
  101. # logger.info(f"config {config}")
  102. res = update_config(Path(sub_file.file_path), config, Path(temp_path))
  103. try:
  104. command = [str(settings.MIHOMO_BIN_PATH), "-f", str(temp_path)]
  105. logger.info(f"Executing command: {' '.join(command)}")
  106. pid = process_manager.start_process(command, external_controller_port)
  107. miho_model.mixed_port = mixed_port
  108. miho_model.external_controller = f'127.0.0.1:{external_controller_port}'
  109. miho_model.temp_file_path = str(temp_path)
  110. miho_model.pid = pid
  111. miho_model.running = True
  112. miho_model.updated_at = datetime.now()
  113. try:
  114. await request_select_proxy_name(miho_model.external_controller, miho_model.provider_name, miho_model.proxy_name)
  115. except Exception as e:
  116. logger.error(f"Failed to select proxy: {str(e)}")
  117. process_manager.stop_process(external_controller_port)
  118. return MihomoMetaWithURL(error=1, detail=str(e))
  119. session.add(miho_model)
  120. session.commit()
  121. session.refresh(miho_model)
  122. mihomo_with_url = MihomoMetaWithURL(**miho_model.model_dump())
  123. if miho_model.external_controller:
  124. host, port = miho_model.external_controller.split(":")
  125. mihomo_with_url.external_controller_url = f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies"
  126. return mihomo_with_url
  127. except Exception as e:
  128. logger.exception(f"Failed to start mihomo: {str(e)}")
  129. return MihomoMetaWithURL(error=1, detail=str(e))
  130. @mihomo_router.post("/startup")
  131. async def post_start_each_provider():
  132. db = SubscriptionManager()
  133. results = db.get_each_provider_proxies()
  134. logger.info(f"{len(results)}")
  135. port_start = 9350
  136. # 创建所有任务的列表,为每个任务分配不同的端口号
  137. tasks = []
  138. for idx, provider_moho in enumerate(results):
  139. port = port_start + idx * 2
  140. tasks.append(
  141. post_start_mihomo(MihomoBatchRequest(
  142. id=int(provider_moho.id),
  143. port=port
  144. ))
  145. )
  146. # 并发执行所有任务,并允许任务抛出异常而不中断其他任务
  147. results = await asyncio.gather(*tasks, return_exceptions=True)
  148. # 处理结果和错误
  149. ret = []
  150. for result in results:
  151. if isinstance(result, Exception):
  152. # 如果是异常,记录错误并添加到 ret 中
  153. logger.error(f"Failed to start mihomo: {str(result)}")
  154. ret.append({"error": str(result)})
  155. else:
  156. # 如果是正常结果,直接添加到 ret 中
  157. ret.append(result)
  158. return ret
  159. @mihomo_router.post("/stop")
  160. async def post_stop_mihomo(request: MihomoBatchRequest):
  161. db = SubscriptionManager()
  162. with Session(db.engine) as session:
  163. selected_provider = session.exec(
  164. select(MihomoMeta)
  165. .where(MihomoMeta.id == request.id)
  166. ).first()
  167. if not selected_provider:
  168. logger.error(f"Provider not found with id {request.id}")
  169. raise HTTPException(status_code=404, detail="Provider not found")
  170. if selected_provider.pid:
  171. try:
  172. process_manager.stop_process(selected_provider.external_controller)
  173. except Exception as e:
  174. logger.error(f"Failed to stop mihomo: {str(e)}")
  175. raise HTTPException(status_code=500, detail=str(e))
  176. selected_provider.pid = None
  177. selected_provider.running = False
  178. selected_provider.updated_at = datetime.now()
  179. session.add(selected_provider)
  180. session.commit()
  181. session.refresh(selected_provider)
  182. return selected_provider
  183. else:
  184. raise HTTPException(status_code=400, detail="Provider is not running")
  185. @mihomo_router.get("/")
  186. async def get_mihomo_running_status() -> List[MihomoMetaWithURL]:
  187. db = SubscriptionManager()
  188. all = db.get_running_proxies()
  189. result = []
  190. for mihomo_model in all:
  191. mihomo_with_url = MihomoMetaWithURL(**mihomo_model.model_dump())
  192. if mihomo_model.external_controller:
  193. host, port = mihomo_model.external_controller.split(":")
  194. mihomo_with_url.external_controller_url = f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies"
  195. result.append(mihomo_with_url)
  196. return result
  197. @mihomo_router.post("/proxies_reachability", response_model=List[ProxiesReachabilityResponse])
  198. async def get_proxies_reachability():
  199. db = SubscriptionManager()
  200. # 清空所有代理的延迟值
  201. with Session(db.engine) as session:
  202. session.exec(
  203. update(MihomoMeta)
  204. .values(delay=None)
  205. )
  206. session.commit()
  207. # 获取所有运行中的代理
  208. running_proxies = db.get_each_provider_running_proxies()
  209. # 过滤掉没有 external_controller 的代理
  210. valid_proxies = [proxy for proxy in running_proxies if proxy.external_controller]
  211. # 创建并发任务列表
  212. tasks = [
  213. async_proxy_delay(proxy.provider_name, proxy.external_controller)
  214. for proxy in valid_proxies
  215. ]
  216. # 并发执行所有延迟测试
  217. delay_results = await asyncio.gather(*tasks, return_exceptions=True)
  218. # 处理结果并更新数据库
  219. results = []
  220. for proxy, delay_result in zip(valid_proxies, delay_results):
  221. response = ProxiesReachabilityResponse(
  222. provider_name=proxy.provider_name,
  223. external_controller_url=f"https://yacd.metacubex.one/?hostname=127.0.0.1&port={proxy.external_controller.split(':')[1]}&secret=#/proxies"
  224. )
  225. if isinstance(delay_result, Exception):
  226. # 处理错误情况
  227. logger.error(f"Failed to update delays for {proxy.provider_name}: {str(delay_result)}")
  228. response.error = str(delay_result)
  229. else:
  230. # 更新数据库并收集结果
  231. db.update_proxy_delays(
  232. provider_name=proxy.provider_name,
  233. delays=delay_result
  234. )
  235. response.delays = delay_result
  236. results.append(response)
  237. return results
  238. @mihomo_router.post("/start_all_proxies_reachability")
  239. async def start_all_proxies_reachability():
  240. await post_start_each_provider()
  241. await get_proxies_reachability()
  242. db = SubscriptionManager()
  243. with Session(db.engine) as session:
  244. proxies_reachability = session.exec(
  245. select(MihomoMeta)
  246. .where(MihomoMeta.delay.is_not(None))
  247. ).all()
  248. tasks = []
  249. port_start = 9350
  250. for proxy in proxies_reachability:
  251. tasks.append(post_start_mihomo(MihomoBatchRequest(id=int(proxy.id), port=port_start)))
  252. port_start += 2
  253. # 并发执行所有任务,并允许任务抛出异常而不中断其他任务
  254. results = await asyncio.gather(*tasks, return_exceptions=True)
  255. # 处理结果和错误
  256. ret = []
  257. for result in results:
  258. if isinstance(result, Exception):
  259. # 如果是异常,记录错误并添加到 ret 中
  260. logger.error(f"Failed to start mihomo: {str(result)}")
  261. ret.append({"error": str(result)})
  262. else:
  263. # 如果是正常结果,直接添加到 ret 中
  264. ret.append(result)
  265. return ret
  266. @mihomo_router.get("/external-controller")
  267. async def get_controller_urls():
  268. running_list = await get_mihomo_running_status()
  269. urls = []
  270. for item in running_list:
  271. host, port = item.external_controller.split(":")
  272. urls.append(f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies")
  273. return urls
  274. async def stop_all_mihomo():
  275. running_list = await get_mihomo_running_status()
  276. for item in running_list:
  277. if item.pid:
  278. logger.info(f"stop mihomo {item}")
  279. await post_stop_mihomo(MihomoBatchRequest(id=item.id))