| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- import asyncio
- import os
- import subprocess
- import hashlib
- from pathlib import Path
- from fastapi import APIRouter, HTTPException
- from datetime import datetime, timedelta
- from pydantic import BaseModel
- from typing import Dict, List, Optional
- import httpx
- from sqlmodel import Session, select, or_
- import yaml
- import signal
- from asyncio import subprocess
- import multiprocessing
- from multiprocessing import Pipe, Process
- from config.logu import logger, get_logger
- from config.settings import settings
- from config.app_yaml import app_yaml, Subscription
- from routers.subscriptions import list_subscriptions,SubscriptionResponse
- from utils.mihomo_service import port_is_using,find_free_port
- from utils.sub import update_config
- from utils.processes_mgr import process_manager
- from database.models.subscription import SubscriptionManager,SubscriptFile,MihomoMeta
- # 初始化全局变量来保存进程池
- POOL = None
- mihomo_router = APIRouter()
- class MihomoBatchRequest(BaseModel):
- id: int
- port: Optional[int] = None
- class MihomoRunningStatus(MihomoBatchRequest):
- pid: int
- started_at: datetime
- class ProcessInfo(BaseModel):
- provider_name: str
-
- class MihomoResponse(MihomoBatchRequest):
- error: int = 0
- detail: Optional[Dict] = None
- class MihomoMetaWithURL(MihomoMeta, table=False):
- # 覆盖导致问题的关系字段
- subscript_file: Optional[int] = None # 使用外键类型替代关系对象
- external_controller_url: Optional[str] = None
- class Config:
- arbitrary_types_allowed = True
-
- async def request_select_proxy_name(external_ctl: str, provider_name: str, proxy_name: str, max_retries: int = 5, delay: float = 2.0) -> Optional[dict]:
- url = f"http://{external_ctl}/proxies/{provider_name}"
- payload = {"name": proxy_name}
-
- async with httpx.AsyncClient() as client:
- for attempt in range(max_retries):
- try:
- response = await client.put(url, json=payload)
- response.raise_for_status() # 如果请求失败,抛出异常
- return response # 该接口没有返回值,直接返回response
- except (httpx.HTTPError, httpx.RequestError) as e:
- if attempt < max_retries - 1:
- await asyncio.sleep(delay) # 等待一段时间后重试
- else:
- raise HTTPException(status_code=500, detail=f"Failed to select proxy after {max_retries} attempts: {str(e)}")
- @mihomo_router.post("/start")
- async def post_start_mihomo(request: MihomoBatchRequest):
- db = SubscriptionManager()
-
- # 获取对应的订阅文件
- with Session(db.engine) as session:
- # 查找对应的订阅文件
- miho_model = session.exec(
- select(MihomoMeta)
- .where(MihomoMeta.id == request.id)
- ).first()
-
- if not miho_model:
- raise HTTPException(status_code=404, detail="Provider not found")
- sub_file = miho_model.subscript_file
- # logger.info(f"miho_model.subscript_file {miho_model.subscript_file}")
- # return miho_model
- if miho_model.pid:
- return miho_model
- mixed_port = request.port
- # 如果端口未指定,查找可用端口
- if not mixed_port:
- mixed_port = find_free_port()
- external_controller_port = find_free_port((mixed_port+1, 18000))
- config = {}
- # 保存临时配置文件
- temp_path = settings.MIHOMO_TEMP_PATH / f"{miho_model.provider_name}_{external_controller_port}.yaml"
- # 更新端口配置
- config['mixed-port'] = mixed_port
- config['external-controller'] = f'127.0.0.1:{external_controller_port}'
- config['bind-address'] = '127.0.0.1'
- logger.info(f"sub_file.file_path {sub_file.file_path}")
- logger.info(f"temp_path {temp_path}")
- logger.info(f"config {config}")
- res = update_config(Path(sub_file.file_path), config, Path(temp_path))
-
- # 启动进程
- try:
- command = [str(settings.MIHOMO_BIN_PATH), "-f", str(temp_path)]
- logger.info(f"Executing command: {' '.join(command)}")
-
- pid = process_manager.start_process(command, external_controller_port)
-
- miho_model.mixed_port = mixed_port
- miho_model.external_controller = f'127.0.0.1:{external_controller_port}'
- miho_model.temp_file_path = str(temp_path)
- miho_model.pid = pid
- miho_model.running = True
- miho_model.updated_at = datetime.now()
- try:
- await request_select_proxy_name(miho_model.external_controller, miho_model.provider_name, miho_model.proxy_name)
- except Exception as e:
- logger.error(f"Failed to select proxy: {str(e)}")
- process_manager.stop_process(external_controller_port)
- raise HTTPException(status_code=500, detail=str(e))
- # 更新数据库记录
-
- session.add(miho_model)
- session.commit()
- session.refresh(miho_model)
- mihomo_with_url = MihomoMetaWithURL(**miho_model.model_dump())
- if miho_model.external_controller:
- host, port = miho_model.external_controller.split(":")
- mihomo_with_url.external_controller_url = f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies"
- return mihomo_with_url
-
- except Exception as e:
- logger.exception(f"Failed to start mihomo: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @mihomo_router.post("/stop")
- async def post_stop_mihomo(request: MihomoBatchRequest):
- db = SubscriptionManager()
- with Session(db.engine) as session:
- selected_provider = session.exec(
- select(MihomoMeta)
- .where(MihomoMeta.id == request.id)
- ).first()
- if not selected_provider:
- logger.error(f"Provider not found with id {request.id}")
- raise HTTPException(status_code=404, detail="Provider not found")
- if selected_provider.pid:
- try:
- process_manager.stop_process(selected_provider.external_controller)
- except Exception as e:
- logger.error(f"Failed to stop mihomo: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- selected_provider.pid = None
- selected_provider.running = False
- selected_provider.updated_at = datetime.now()
- session.add(selected_provider)
- session.commit()
- session.refresh(selected_provider)
- return selected_provider
- else:
- raise HTTPException(status_code=400, detail="Provider is not running")
- @mihomo_router.get("/")
- async def get_mihomo_running_status():
- db = SubscriptionManager()
- with Session(db.engine) as session:
- all = session.exec(
- select(MihomoMeta)
- .where(MihomoMeta.pid.is_not(None))
- ).all()
-
- result = []
- for mihomo_model in all:
- mihomo_with_url = MihomoMetaWithURL(**mihomo_model.model_dump())
- if mihomo_model.external_controller:
- host, port = mihomo_model.external_controller.split(":")
- mihomo_with_url.external_controller_url = f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies"
- result.append(mihomo_with_url)
-
- return result
- @mihomo_router.get("/external-controller")
- async def get_controller_urls():
- running_list = await get_mihomo_running_status()
- logger.info(f"running_list {running_list}")
- # https://yacd.metacubex.one/?hostname=127.0.0.1&port=9351&secret=#/proxies
- urls = []
- for item in running_list:
- host, port = item.external_controller.split(":")
- urls.append(f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies")
- return urls
- async def stop_all_mihomo():
- running_list = await get_mihomo_running_status()
- for item in running_list:
- if item.pid:
- logger.info(f"stop mihomo {item}")
- await post_stop_mihomo(MihomoBatchRequest(id=item.id))
- # logger.info(f"running_list {running_list}")
|