mihomo.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. import asyncio
  2. import os
  3. import subprocess
  4. import hashlib
  5. from pathlib import Path
  6. from fastapi import APIRouter, HTTPException
  7. from datetime import datetime, timedelta
  8. from pydantic import BaseModel
  9. from typing import Dict, List, Optional, Union
  10. import httpx
  11. from sqlalchemy import JSON
  12. from sqlmodel import Field, Session, select, or_,func, update
  13. import yaml
  14. import signal
  15. from asyncio import subprocess
  16. import multiprocessing
  17. from multiprocessing import Pipe, Process
  18. from config.logu import logger, get_logger
  19. from config.settings import settings
  20. from routers.subscriptions import list_subscriptions,SubscriptionResponse
  21. from utils.mihomo_service import port_is_using,find_free_port
  22. from utils.sub import update_config
  23. from utils.processes_mgr import process_manager
  24. from database.models.subscription import SubscriptionManager,SubscriptFile,MihomoMeta
  25. POOL = None
  26. mihomo_router = APIRouter()
  27. class MihomoBatchRequest(BaseModel):
  28. id: int
  29. port: Optional[int] = None
  30. class MihomoRunningStatus(MihomoBatchRequest):
  31. pid: int
  32. started_at: datetime
  33. class ProcessInfo(BaseModel):
  34. provider_name: str
  35. class MihomoResponse(MihomoBatchRequest):
  36. error: int = 0
  37. detail: Optional[Dict] = None
  38. class MihomoMetaWithURL(MihomoMeta, table=False):
  39. subscript_file: Optional[int] = None
  40. external_controller_url: Optional[str] = None
  41. error: Optional[str] = None # 新增错误信息字段
  42. detail: Optional[Dict] = Field(default={}, sa_type=JSON)
  43. class Config:
  44. arbitrary_types_allowed = True
  45. class ProxiesReachabilityResponse(BaseModel):
  46. provider_name: str
  47. delays: Optional[Dict[str, int]] = None
  48. error: Optional[str] = None
  49. external_controller_url: Optional[str] = None
  50. async def request_select_proxy_name(external_ctl: str, provider_name: str, proxy_name: str, max_retries: int = 5, delay: float = 2.0) -> Optional[dict]:
  51. url = f"http://{external_ctl}/proxies/{provider_name}"
  52. payload = {"name": proxy_name}
  53. async with httpx.AsyncClient() as client:
  54. for attempt in range(max_retries):
  55. try:
  56. response = await client.put(url, json=payload)
  57. response.raise_for_status()
  58. return response
  59. except (httpx.HTTPError, httpx.RequestError) as e:
  60. if attempt < max_retries - 1:
  61. await asyncio.sleep(delay)
  62. else:
  63. raise HTTPException(status_code=500, detail=f"Failed to select proxy after {max_retries} attempts: {str(e)}")
  64. async def async_proxy_delay(provider_name: str, external_controller: str) -> dict:
  65. """异步获取代理延迟
  66. 失败: {'message': 'get delay: all proxies timeout'}
  67. 成功: {'自动选择': 34, '🇦🇺澳大利亚悉尼': 159, '🇦🇺澳大利亚悉尼2': 1577,...}
  68. """
  69. url = f"http://{external_controller}/group/{provider_name}/delay?url=https%3A%2F%2Fwww.gstatic.com%2Fgenerate_204&timeout=2000"
  70. async with httpx.AsyncClient() as client:
  71. try:
  72. response = await client.get(url, timeout=30)
  73. response.raise_for_status()
  74. return response.json()
  75. except Exception as e:
  76. return {"error": str(e)}
  77. @mihomo_router.post("/start")
  78. async def post_start_mihomo(request: MihomoBatchRequest) -> MihomoMetaWithURL :
  79. db = SubscriptionManager()
  80. logger.info(f"{request}")
  81. with Session(db.engine) as session:
  82. miho_model = session.exec(
  83. select(MihomoMeta)
  84. .where(MihomoMeta.id == request.id)
  85. ).first()
  86. if not miho_model:
  87. return MihomoMetaWithURL(error=1, detail="mihomo not found")
  88. sub_file = miho_model.subscript_file
  89. if miho_model.pid:
  90. return miho_model
  91. mixed_port = request.port
  92. if not mixed_port:
  93. mixed_port = await find_free_port()
  94. external_controller_port = await find_free_port((mixed_port+1, 18000))
  95. config = {}
  96. temp_path = settings.MIHOMO_TEMP_PATH / f"{miho_model.provider_name}_{external_controller_port}.yaml"
  97. config['mixed-port'] = mixed_port
  98. config['external-controller'] = f'127.0.0.1:{external_controller_port}'
  99. config['bind-address'] = '127.0.0.1'
  100. # logger.info(f"sub_file.file_path {sub_file.file_path}")
  101. # logger.info(f"temp_path {temp_path}")
  102. # logger.info(f"config {config}")
  103. res = update_config(Path(sub_file.file_path), config, Path(temp_path))
  104. try:
  105. command = [str(settings.MIHOMO_BIN_PATH), "-f", str(temp_path)]
  106. logger.info(f"Executing command: {' '.join(command)}")
  107. pid = process_manager.start_process(command, external_controller_port)
  108. miho_model.mixed_port = mixed_port
  109. miho_model.external_controller = f'127.0.0.1:{external_controller_port}'
  110. miho_model.temp_file_path = str(temp_path)
  111. miho_model.pid = pid
  112. miho_model.running = True
  113. miho_model.updated_at = datetime.now()
  114. try:
  115. await request_select_proxy_name(miho_model.external_controller, miho_model.provider_name, miho_model.proxy_name)
  116. except Exception as e:
  117. logger.error(f"Failed to select proxy: {str(e)}")
  118. process_manager.stop_process(external_controller_port)
  119. return MihomoMetaWithURL(error=1, detail=str(e))
  120. session.add(miho_model)
  121. session.commit()
  122. session.refresh(miho_model)
  123. mihomo_with_url = MihomoMetaWithURL(**miho_model.model_dump())
  124. if miho_model.external_controller:
  125. host, port = miho_model.external_controller.split(":")
  126. mihomo_with_url.external_controller_url = f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies"
  127. return mihomo_with_url
  128. except Exception as e:
  129. logger.exception(f"Failed to start mihomo: {str(e)}")
  130. return MihomoMetaWithURL(error=1, detail=str(e))
  131. @mihomo_router.post("/startup")
  132. async def post_start_each_provider():
  133. db = SubscriptionManager()
  134. results = db.get_each_provider_proxies()
  135. logger.info(f"{len(results)}")
  136. port_start = 9350
  137. # 创建所有任务的列表,为每个任务分配不同的端口号
  138. tasks = []
  139. for idx, provider_moho in enumerate(results):
  140. port = port_start + idx * 2
  141. tasks.append(
  142. post_start_mihomo(MihomoBatchRequest(
  143. id=int(provider_moho.id),
  144. port=port
  145. ))
  146. )
  147. # 并发执行所有任务,并允许任务抛出异常而不中断其他任务
  148. results = await asyncio.gather(*tasks, return_exceptions=True)
  149. # 处理结果和错误
  150. ret = []
  151. for result in results:
  152. if isinstance(result, Exception):
  153. # 如果是异常,记录错误并添加到 ret 中
  154. logger.error(f"Failed to start mihomo: {str(result)}")
  155. ret.append({"error": str(result)})
  156. else:
  157. # 如果是正常结果,直接添加到 ret 中
  158. ret.append(result)
  159. return ret
  160. @mihomo_router.post("/stop")
  161. async def post_stop_mihomo(request: MihomoBatchRequest):
  162. db = SubscriptionManager()
  163. with Session(db.engine) as session:
  164. selected_provider = session.exec(
  165. select(MihomoMeta)
  166. .where(MihomoMeta.id == request.id)
  167. ).first()
  168. if not selected_provider:
  169. logger.error(f"Provider not found with id {request.id}")
  170. raise HTTPException(status_code=404, detail="Provider not found")
  171. if selected_provider.pid:
  172. try:
  173. process_manager.stop_process(selected_provider.external_controller)
  174. except Exception as e:
  175. logger.error(f"Failed to stop mihomo: {str(e)}")
  176. raise HTTPException(status_code=500, detail=str(e))
  177. selected_provider.pid = None
  178. selected_provider.running = False
  179. selected_provider.updated_at = datetime.now()
  180. session.add(selected_provider)
  181. session.commit()
  182. session.refresh(selected_provider)
  183. return selected_provider
  184. else:
  185. raise HTTPException(status_code=400, detail="Provider is not running")
  186. @mihomo_router.get("/")
  187. async def get_mihomo_running_status() -> List[MihomoMetaWithURL]:
  188. db = SubscriptionManager()
  189. all = db.get_running_proxies()
  190. result = []
  191. for mihomo_model in all:
  192. mihomo_with_url = MihomoMetaWithURL(**mihomo_model.model_dump())
  193. if mihomo_model.external_controller:
  194. host, port = mihomo_model.external_controller.split(":")
  195. mihomo_with_url.external_controller_url = f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies"
  196. result.append(mihomo_with_url)
  197. return result
  198. @mihomo_router.post("/proxies_reachability", response_model=List[ProxiesReachabilityResponse])
  199. async def get_proxies_reachability():
  200. db = SubscriptionManager()
  201. # 清空所有代理的延迟值
  202. with Session(db.engine) as session:
  203. session.exec(
  204. update(MihomoMeta)
  205. .values(delay=None)
  206. )
  207. session.commit()
  208. # 获取所有运行中的代理
  209. running_proxies = db.get_each_provider_running_proxies()
  210. # 过滤掉没有 external_controller 的代理
  211. valid_proxies = [proxy for proxy in running_proxies if proxy.external_controller]
  212. # 创建并发任务列表
  213. tasks = [
  214. async_proxy_delay(proxy.provider_name, proxy.external_controller)
  215. for proxy in valid_proxies
  216. ]
  217. # 并发执行所有延迟测试
  218. delay_results = await asyncio.gather(*tasks, return_exceptions=True)
  219. # 处理结果并更新数据库
  220. results = []
  221. for proxy, delay_result in zip(valid_proxies, delay_results):
  222. response = ProxiesReachabilityResponse(
  223. provider_name=proxy.provider_name,
  224. external_controller_url=f"https://yacd.metacubex.one/?hostname=127.0.0.1&port={proxy.external_controller.split(':')[1]}&secret=#/proxies"
  225. )
  226. if isinstance(delay_result, Exception):
  227. # 处理错误情况
  228. logger.error(f"Failed to update delays for {proxy.provider_name}: {str(delay_result)}")
  229. response.error = str(delay_result)
  230. else:
  231. # 更新数据库并收集结果
  232. db.update_proxy_delays(
  233. provider_name=proxy.provider_name,
  234. delays=delay_result
  235. )
  236. response.delays = delay_result
  237. results.append(response)
  238. return results
  239. @mihomo_router.post("/start_all_proxies_reachability")
  240. async def start_all_proxies_reachability():
  241. await post_start_each_provider()
  242. await get_proxies_reachability()
  243. db = SubscriptionManager()
  244. with Session(db.engine) as session:
  245. proxies_reachability = session.exec(
  246. select(MihomoMeta)
  247. .where(MihomoMeta.delay.is_not(None))
  248. ).all()
  249. tasks = []
  250. port_start = 9350
  251. for proxy in proxies_reachability:
  252. tasks.append(post_start_mihomo(MihomoBatchRequest(id=int(proxy.id), port=port_start)))
  253. port_start += 2
  254. # 并发执行所有任务,并允许任务抛出异常而不中断其他任务
  255. results = await asyncio.gather(*tasks, return_exceptions=True)
  256. # 处理结果和错误
  257. ret = []
  258. for result in results:
  259. if isinstance(result, Exception):
  260. # 如果是异常,记录错误并添加到 ret 中
  261. logger.error(f"Failed to start mihomo: {str(result)}")
  262. ret.append({"error": str(result)})
  263. else:
  264. # 如果是正常结果,直接添加到 ret 中
  265. ret.append(result)
  266. return ret
  267. @mihomo_router.get("/external-controller")
  268. async def get_controller_urls():
  269. running_list = await get_mihomo_running_status()
  270. urls = []
  271. for item in running_list:
  272. host, port = item.external_controller.split(":")
  273. urls.append(f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies")
  274. return urls
  275. async def stop_all_mihomo():
  276. running_list = await get_mihomo_running_status()
  277. for item in running_list:
  278. if item.pid:
  279. logger.info(f"stop mihomo {item}")
  280. await post_stop_mihomo(MihomoBatchRequest(id=item.id))