||
- import asyncio
- import os
- import subprocess
- import hashlib
- from pathlib import Path
- from fastapi import APIRouter, HTTPException
- from datetime import datetime, timedelta
- from pydantic import BaseModel
- from typing import Dict, List, Optional, Union
- import httpx
- from sqlmodel import Session, select, or_,func, update
- import yaml
- import signal
- from asyncio import subprocess
- import multiprocessing
- from multiprocessing import Pipe, Process
- from config.logu import logger, get_logger
- from config.settings import settings
- from routers.subscriptions import list_subscriptions,SubscriptionResponse
- from utils.mihomo_service import port_is_using,find_free_port
- from utils.sub import update_config
- from utils.processes_mgr import process_manager
- from database.models.subscription import SubscriptionManager,SubscriptFile,MihomoMeta
- POOL = None
- mihomo_router = APIRouter()
- class MihomoBatchRequest(BaseModel):
- id: int
- port: Optional[int] = None
- class MihomoRunningStatus(MihomoBatchRequest):
- pid: int
- started_at: datetime
- class ProcessInfo(BaseModel):
- provider_name: str
-
- class MihomoResponse(MihomoBatchRequest):
- error: int = 0
- detail: Optional[Dict] = None
- class MihomoMetaWithURL(MihomoMeta, table=False):
- subscript_file: Optional[int] = None
- external_controller_url: Optional[str] = None
- error: Optional[str] = None # 新增错误信息字段
- detail: Optional[dict] = None
- class Config:
- arbitrary_types_allowed = True
- class ProxiesReachabilityResponse(BaseModel):
- provider_name: str
- delays: Optional[Dict[str, int]] = None
- error: Optional[str] = None
- external_controller_url: Optional[str] = None
-
- async def request_select_proxy_name(external_ctl: str, provider_name: str, proxy_name: str, max_retries: int = 5, delay: float = 2.0) -> Optional[dict]:
- url = f"http://{external_ctl}/proxies/{provider_name}"
- payload = {"name": proxy_name}
-
- async with httpx.AsyncClient() as client:
- for attempt in range(max_retries):
- try:
- response = await client.put(url, json=payload)
- response.raise_for_status()
- return response
- except (httpx.HTTPError, httpx.RequestError) as e:
- if attempt < max_retries - 1:
- await asyncio.sleep(delay)
- else:
- raise HTTPException(status_code=500, detail=f"Failed to select proxy after {max_retries} attempts: {str(e)}")
- async def async_proxy_delay(provider_name: str, external_controller: str) -> dict:
- """异步获取代理延迟
- 失败: {'message': 'get delay: all proxies timeout'}
- 成功: {'自动选择': 34, '🇦🇺澳大利亚悉尼': 159, '🇦🇺澳大利亚悉尼2': 1577,...}
- """
- url = f"http://{external_controller}/group/{provider_name}/delay?url=https%3A%2F%2Fwww.gstatic.com%2Fgenerate_204&timeout=2000"
- async with httpx.AsyncClient() as client:
- try:
- response = await client.get(url, timeout=30)
- response.raise_for_status()
- return response.json()
- except Exception as e:
- return {"error": str(e)}
- @mihomo_router.post("/start")
- async def post_start_mihomo(request: MihomoBatchRequest) -> MihomoMetaWithURL :
- db = SubscriptionManager()
- logger.info(f"{request}")
- with Session(db.engine) as session:
- miho_model = session.exec(
- select(MihomoMeta)
- .where(MihomoMeta.id == request.id)
- ).first()
-
- if not miho_model:
- return MihomoMetaWithURL(error=1, detail="mihomo not found")
- sub_file = miho_model.subscript_file
-
- if miho_model.pid:
- return miho_model
- mixed_port = request.port
- if not mixed_port:
- mixed_port = await find_free_port()
- external_controller_port = await find_free_port((mixed_port+1, 18000))
- config = {}
- temp_path = settings.MIHOMO_TEMP_PATH / f"{miho_model.provider_name}_{external_controller_port}.yaml"
- config['mixed-port'] = mixed_port
- config['external-controller'] = f'127.0.0.1:{external_controller_port}'
- config['bind-address'] = '127.0.0.1'
- # logger.info(f"sub_file.file_path {sub_file.file_path}")
- # logger.info(f"temp_path {temp_path}")
- # logger.info(f"config {config}")
- res = update_config(Path(sub_file.file_path), config, Path(temp_path))
-
- try:
- command = [str(settings.MIHOMO_BIN_PATH), "-f", str(temp_path)]
- logger.info(f"Executing command: {' '.join(command)}")
-
- pid = process_manager.start_process(command, external_controller_port)
-
- miho_model.mixed_port = mixed_port
- miho_model.external_controller = f'127.0.0.1:{external_controller_port}'
- miho_model.temp_file_path = str(temp_path)
- miho_model.pid = pid
- miho_model.running = True
- miho_model.updated_at = datetime.now()
- try:
- await request_select_proxy_name(miho_model.external_controller, miho_model.provider_name, miho_model.proxy_name)
- except Exception as e:
- logger.error(f"Failed to select proxy: {str(e)}")
- process_manager.stop_process(external_controller_port)
- return MihomoMetaWithURL(error=1, detail=str(e))
-
- session.add(miho_model)
- session.commit()
- session.refresh(miho_model)
- mihomo_with_url = MihomoMetaWithURL(**miho_model.model_dump())
- if miho_model.external_controller:
- host, port = miho_model.external_controller.split(":")
- mihomo_with_url.external_controller_url = f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies"
- return mihomo_with_url
-
- except Exception as e:
- logger.exception(f"Failed to start mihomo: {str(e)}")
- return MihomoMetaWithURL(error=1, detail=str(e))
- @mihomo_router.post("/startup")
- async def post_start_each_provider():
- db = SubscriptionManager()
- results = db.get_each_provider_proxies()
- logger.info(f"{len(results)}")
- port_start = 9350
-
- # 创建所有任务的列表,为每个任务分配不同的端口号
- tasks = []
- for idx, provider_moho in enumerate(results):
- port = port_start + idx * 2
- tasks.append(
- post_start_mihomo(MihomoBatchRequest(
- id=int(provider_moho.id),
- port=port
- ))
- )
-
- # 并发执行所有任务,并允许任务抛出异常而不中断其他任务
- results = await asyncio.gather(*tasks, return_exceptions=True)
-
- # 处理结果和错误
- ret = []
- for result in results:
- if isinstance(result, Exception):
- # 如果是异常,记录错误并添加到 ret 中
- logger.error(f"Failed to start mihomo: {str(result)}")
- ret.append({"error": str(result)})
- else:
- # 如果是正常结果,直接添加到 ret 中
- ret.append(result)
-
- return ret
- @mihomo_router.post("/stop")
- async def post_stop_mihomo(request: MihomoBatchRequest):
- db = SubscriptionManager()
- with Session(db.engine) as session:
- selected_provider = session.exec(
- select(MihomoMeta)
- .where(MihomoMeta.id == request.id)
- ).first()
- if not selected_provider:
- logger.error(f"Provider not found with id {request.id}")
- raise HTTPException(status_code=404, detail="Provider not found")
- if selected_provider.pid:
- try:
- process_manager.stop_process(selected_provider.external_controller)
- except Exception as e:
- logger.error(f"Failed to stop mihomo: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- selected_provider.pid = None
- selected_provider.running = False
- selected_provider.updated_at = datetime.now()
- session.add(selected_provider)
- session.commit()
- session.refresh(selected_provider)
- return selected_provider
- else:
- raise HTTPException(status_code=400, detail="Provider is not running")
- @mihomo_router.get("/")
- async def get_mihomo_running_status() -> List[MihomoMetaWithURL]:
- db = SubscriptionManager()
- all = db.get_running_proxies()
- result = []
- for mihomo_model in all:
- mihomo_with_url = MihomoMetaWithURL(**mihomo_model.model_dump())
- if mihomo_model.external_controller:
- host, port = mihomo_model.external_controller.split(":")
- mihomo_with_url.external_controller_url = f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies"
- result.append(mihomo_with_url)
-
- return result
- @mihomo_router.post("/proxies_reachability", response_model=List[ProxiesReachabilityResponse])
- async def get_proxies_reachability():
- db = SubscriptionManager()
-
- # 清空所有代理的延迟值
- with Session(db.engine) as session:
- session.exec(
- update(MihomoMeta)
- .values(delay=None)
- )
- session.commit()
-
- # 获取所有运行中的代理
- running_proxies = db.get_each_provider_running_proxies()
-
- # 过滤掉没有 external_controller 的代理
- valid_proxies = [proxy for proxy in running_proxies if proxy.external_controller]
-
- # 创建并发任务列表
- tasks = [
- async_proxy_delay(proxy.provider_name, proxy.external_controller)
- for proxy in valid_proxies
- ]
-
- # 并发执行所有延迟测试
- delay_results = await asyncio.gather(*tasks, return_exceptions=True)
-
- # 处理结果并更新数据库
- results = []
- for proxy, delay_result in zip(valid_proxies, delay_results):
- response = ProxiesReachabilityResponse(
- provider_name=proxy.provider_name,
- external_controller_url=f"https://yacd.metacubex.one/?hostname=127.0.0.1&port={proxy.external_controller.split(':')[1]}&secret=#/proxies"
- )
-
- if isinstance(delay_result, Exception):
- # 处理错误情况
- logger.error(f"Failed to update delays for {proxy.provider_name}: {str(delay_result)}")
- response.error = str(delay_result)
- else:
- # 更新数据库并收集结果
- db.update_proxy_delays(
- provider_name=proxy.provider_name,
- delays=delay_result
- )
- response.delays = delay_result
-
- results.append(response)
-
- return results
- @mihomo_router.post("/start_all_proxies_reachability")
- async def start_all_proxies_reachability():
- await post_start_each_provider()
- await get_proxies_reachability()
- db = SubscriptionManager()
- with Session(db.engine) as session:
- proxies_reachability = session.exec(
- select(MihomoMeta)
- .where(MihomoMeta.delay.is_not(None))
- ).all()
- tasks = []
- port_start = 9350
- for proxy in proxies_reachability:
- tasks.append(post_start_mihomo(MihomoBatchRequest(id=int(proxy.id), port=port_start)))
- port_start += 2
- # 并发执行所有任务,并允许任务抛出异常而不中断其他任务
- results = await asyncio.gather(*tasks, return_exceptions=True)
-
- # 处理结果和错误
- ret = []
- for result in results:
- if isinstance(result, Exception):
- # 如果是异常,记录错误并添加到 ret 中
- logger.error(f"Failed to start mihomo: {str(result)}")
- ret.append({"error": str(result)})
- else:
- # 如果是正常结果,直接添加到 ret 中
- ret.append(result)
-
- return ret
- @mihomo_router.get("/external-controller")
- async def get_controller_urls():
- running_list = await get_mihomo_running_status()
- urls = []
- for item in running_list:
- host, port = item.external_controller.split(":")
- urls.append(f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies")
- return urls
- async def stop_all_mihomo():
- running_list = await get_mihomo_running_status()
- for item in running_list:
- if item.pid:
- logger.info(f"stop mihomo {item}")
- await post_stop_mihomo(MihomoBatchRequest(id=item.id))
|