import asyncio import os import subprocess import hashlib from pathlib import Path from fastapi import APIRouter, HTTPException from datetime import datetime, timedelta from pydantic import BaseModel from typing import Dict, List, Optional import httpx from sqlmodel import Session, select, or_,func import yaml import signal from asyncio import subprocess import multiprocessing from multiprocessing import Pipe, Process from config.logu import logger, get_logger from config.settings import settings from routers.subscriptions import list_subscriptions,SubscriptionResponse from utils.mihomo_service import port_is_using,find_free_port from utils.sub import update_config from utils.processes_mgr import process_manager from database.models.subscription import SubscriptionManager,SubscriptFile,MihomoMeta # 初始化全局变量来保存进程池 POOL = None mihomo_router = APIRouter() class MihomoBatchRequest(BaseModel): id: int port: Optional[int] = None class MihomoRunningStatus(MihomoBatchRequest): pid: int started_at: datetime class ProcessInfo(BaseModel): provider_name: str class MihomoResponse(MihomoBatchRequest): error: int = 0 detail: Optional[Dict] = None class MihomoMetaWithURL(MihomoMeta, table=False): # 覆盖导致问题的关系字段 subscript_file: Optional[int] = None # 使用外键类型替代关系对象 external_controller_url: Optional[str] = None class Config: arbitrary_types_allowed = True async def request_select_proxy_name(external_ctl: str, provider_name: str, proxy_name: str, max_retries: int = 5, delay: float = 2.0) -> Optional[dict]: url = f"http://{external_ctl}/proxies/{provider_name}" payload = {"name": proxy_name} async with httpx.AsyncClient() as client: for attempt in range(max_retries): try: response = await client.put(url, json=payload) response.raise_for_status() # 如果请求失败,抛出异常 return response # 该接口没有返回值,直接返回response except (httpx.HTTPError, httpx.RequestError) as e: if attempt < max_retries - 1: await asyncio.sleep(delay) # 等待一段时间后重试 else: raise HTTPException(status_code=500, detail=f"Failed to select proxy after {max_retries} attempts: {str(e)}") @mihomo_router.post("/start") async def post_start_mihomo(request: MihomoBatchRequest): db = SubscriptionManager() logger.info(f"{request}") # 获取对应的订阅文件 with Session(db.engine) as session: # 查找对应的订阅文件 miho_model = session.exec( select(MihomoMeta) .where(MihomoMeta.id == request.id) ).first() if not miho_model: raise HTTPException(status_code=404, detail="Provider not found") sub_file = miho_model.subscript_file # logger.info(f"miho_model.subscript_file {miho_model.subscript_file}") # return miho_model if miho_model.pid: return miho_model mixed_port = request.port # 如果端口未指定,查找可用端口 if not mixed_port: mixed_port = find_free_port() external_controller_port = find_free_port((mixed_port+1, 18000)) config = {} # 保存临时配置文件 temp_path = settings.MIHOMO_TEMP_PATH / f"{miho_model.provider_name}_{external_controller_port}.yaml" # 更新端口配置 config['mixed-port'] = mixed_port config['external-controller'] = f'127.0.0.1:{external_controller_port}' config['bind-address'] = '127.0.0.1' logger.info(f"sub_file.file_path {sub_file.file_path}") logger.info(f"temp_path {temp_path}") logger.info(f"config {config}") res = update_config(Path(sub_file.file_path), config, Path(temp_path)) # 启动进程 try: command = [str(settings.MIHOMO_BIN_PATH), "-f", str(temp_path)] logger.info(f"Executing command: {' '.join(command)}") pid = process_manager.start_process(command, external_controller_port) miho_model.mixed_port = mixed_port miho_model.external_controller = f'127.0.0.1:{external_controller_port}' miho_model.temp_file_path = str(temp_path) miho_model.pid = pid miho_model.running = True miho_model.updated_at = datetime.now() try: await request_select_proxy_name(miho_model.external_controller, miho_model.provider_name, miho_model.proxy_name) except Exception as e: logger.error(f"Failed to select proxy: {str(e)}") process_manager.stop_process(external_controller_port) raise HTTPException(status_code=500, detail=str(e)) # 更新数据库记录 session.add(miho_model) session.commit() session.refresh(miho_model) mihomo_with_url = MihomoMetaWithURL(**miho_model.model_dump()) if miho_model.external_controller: host, port = miho_model.external_controller.split(":") mihomo_with_url.external_controller_url = f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies" return mihomo_with_url except Exception as e: logger.exception(f"Failed to start mihomo: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @mihomo_router.post("/startup") async def post_start_each_provider(): db = SubscriptionManager() with Session(db.engine) as session: # 子查询:获取每个 provider_name 的最小 id subquery = ( select( MihomoMeta.provider_name, func.min(MihomoMeta.id).label("min_id") ) .group_by(MihomoMeta.provider_name) .subquery() ) # 主查询:通过联接到子查询获取每个 provider_name 的第一条记录 stmt = ( select(MihomoMeta) .join(subquery, MihomoMeta.id == subquery.c.min_id) ) # 执行查询并获取结果 results = session.exec(stmt).all() ret = [] for provider_moho in results: try: res = await post_start_mihomo(MihomoBatchRequest(id=int(provider_moho.id))) ret.append(res) except Exception as e: logger.exception(f"Failed to start mihomo: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) return ret @mihomo_router.post("/stop") async def post_stop_mihomo(request: MihomoBatchRequest): db = SubscriptionManager() with Session(db.engine) as session: selected_provider = session.exec( select(MihomoMeta) .where(MihomoMeta.id == request.id) ).first() if not selected_provider: logger.error(f"Provider not found with id {request.id}") raise HTTPException(status_code=404, detail="Provider not found") if selected_provider.pid: try: process_manager.stop_process(selected_provider.external_controller) except Exception as e: logger.error(f"Failed to stop mihomo: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) selected_provider.pid = None selected_provider.running = False selected_provider.updated_at = datetime.now() session.add(selected_provider) session.commit() session.refresh(selected_provider) return selected_provider else: raise HTTPException(status_code=400, detail="Provider is not running") @mihomo_router.get("/") async def get_mihomo_running_status(): db = SubscriptionManager() with Session(db.engine) as session: all = session.exec( select(MihomoMeta) .where(MihomoMeta.pid.is_not(None)) ).all() result = [] for mihomo_model in all: mihomo_with_url = MihomoMetaWithURL(**mihomo_model.model_dump()) if mihomo_model.external_controller: host, port = mihomo_model.external_controller.split(":") mihomo_with_url.external_controller_url = f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies" result.append(mihomo_with_url) return result @mihomo_router.get("/external-controller") async def get_controller_urls(): running_list = await get_mihomo_running_status() logger.info(f"running_list {running_list}") # https://yacd.metacubex.one/?hostname=127.0.0.1&port=9351&secret=#/proxies urls = [] for item in running_list: host, port = item.external_controller.split(":") urls.append(f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies") return urls async def stop_all_mihomo(): running_list = await get_mihomo_running_status() for item in running_list: if item.pid: logger.info(f"stop mihomo {item}") await post_stop_mihomo(MihomoBatchRequest(id=item.id)) # logger.info(f"running_list {running_list}")