Sfoglia il codice sorgente

完成单个 mihomo 启停和数据库更新

mrh 10 mesi fa
parent
commit
bea308e1a8
4 ha cambiato i file con 214 aggiunte e 68 eliminazioni
  1. 4 1
      backend/main.py
  2. 137 64
      backend/routers/mihomo.py
  3. 10 3
      backend/routers/subscriptions.py
  4. 63 0
      backend/utils/processes_mgr.py

+ 4 - 1
backend/main.py

@@ -10,10 +10,11 @@ import asyncio
 import httpx
 from config.settings import settings
 from routers.subscriptions import router
-from routers.mihomo import mihomo_router
+from routers.mihomo import mihomo_router,stop_all_mihomo
 from utils.mihomo_service import download_mihomo
 from database.engine import create_db_and_tables
 from aiomultiprocess import Pool
+from utils.processes_mgr import process_manager
 @asynccontextmanager
 async def lifespan(app: FastAPI):
     """应用生命周期管理"""
@@ -23,7 +24,9 @@ async def lifespan(app: FastAPI):
     create_db_and_tables()
     # 检查并下载mihomo
     await download_mihomo()
+    await stop_all_mihomo()
     yield
+    process_manager.stop_all_processes()
 
 app = FastAPI(lifespan=lifespan)
 

+ 137 - 64
backend/routers/mihomo.py

@@ -1,30 +1,34 @@
 import asyncio
+import os
+import subprocess
 import hashlib
+from pathlib import Path
 from fastapi import APIRouter, HTTPException
 from datetime import datetime, timedelta
 from pydantic import BaseModel
 from typing import Dict, List, Optional
 import httpx
-from sqlmodel import Session, select
+from sqlmodel import Session, select, or_
 import yaml
+import signal
+from asyncio import subprocess
+import multiprocessing
+from multiprocessing import Pipe, Process
 from config.logu import logger, get_logger
 from config.settings import settings
 from config.app_yaml import app_yaml, Subscription
 from routers.subscriptions import list_subscriptions,SubscriptionResponse
 from utils.mihomo_service import port_is_using,find_free_port
 from utils.sub import update_config
+from utils.processes_mgr import process_manager
 from database.models.subscription import SubscriptionManager,SubscriptFile,MihomoMeta
-
 # 初始化全局变量来保存进程池
 POOL = None
-processes = []
-mihomo_running_status = {}
 mihomo_router = APIRouter()
 
 
 class MihomoBatchRequest(BaseModel):
-    provider_name: str
-    proxy_name: str
+    id: int
     port: Optional[int] = None
 
 class MihomoRunningStatus(MihomoBatchRequest):
@@ -38,23 +42,29 @@ class MihomoResponse(MihomoBatchRequest):
     error: int = 0
     detail: Optional[Dict] = None
 
-async def start_mihomo(bin_exe: str, config_yaml_path: str):
-    global POOL
-    process = await asyncio.create_subprocess_exec(
-            bin_exe,
-            "-f",
-            str(config_yaml_path),
-            stdout=asyncio.subprocess.PIPE,
-            stderr=asyncio.subprocess.PIPE,
-        )
-    processes.append(process)  
-    return process
-
-async def stop_mihomo(process: asyncio.subprocess.Process):
-    process.terminate()
-    await process.wait()
-    processes.remove(process)  
+class MihomoMetaWithURL(MihomoMeta, table=False):
+    # 覆盖导致问题的关系字段
+    subscript_file: Optional[int] = None  # 使用外键类型替代关系对象
+    external_controller_url: Optional[str] = None
 
+    class Config:
+        arbitrary_types_allowed = True
+        
+async def request_select_proxy_name(external_ctl: str, provider_name: str, proxy_name: str, max_retries: int = 5, delay: float = 2.0) -> Optional[dict]:
+    url = f"http://{external_ctl}/proxies/{provider_name}"
+    payload = {"name": proxy_name}
+    
+    async with httpx.AsyncClient() as client:
+        for attempt in range(max_retries):
+            try:
+                response = await client.put(url, json=payload)
+                response.raise_for_status()  # 如果请求失败,抛出异常
+                return response  # 该接口没有返回值,直接返回response
+            except (httpx.HTTPError, httpx.RequestError) as e:
+                if attempt < max_retries - 1:
+                    await asyncio.sleep(delay)  # 等待一段时间后重试
+                else:
+                    raise HTTPException(status_code=500, detail=f"Failed to select proxy after {max_retries} attempts: {str(e)}")
 @mihomo_router.post("/start")
 async def post_start_mihomo(request: MihomoBatchRequest):
     db = SubscriptionManager()
@@ -62,67 +72,130 @@ async def post_start_mihomo(request: MihomoBatchRequest):
     # 获取对应的订阅文件
     with Session(db.engine) as session:
         # 查找对应的订阅文件
-        sub_file = session.exec(
-            select(SubscriptFile)
-            .where(SubscriptFile.name == request.provider_name)
-        ).first()
-        
-        if not sub_file:
-            raise HTTPException(status_code=404, detail="Provider not found")
-            
-        # 查找对应的代理配置
-        proxy = session.exec(
+        miho_model = session.exec(
             select(MihomoMeta)
-            .where(MihomoMeta.provider_name == request.provider_name)
-            .where(MihomoMeta.proxy_name == request.proxy_name)
+            .where(MihomoMeta.id == request.id)
         ).first()
         
-        if not proxy:
-            raise HTTPException(status_code=404, detail="Proxy not found")
-            
+        if not miho_model:
+            raise HTTPException(status_code=404, detail="Provider not found")
+        sub_file = miho_model.subscript_file
+        # logger.info(f"miho_model.subscript_file {miho_model.subscript_file}")    
+        # return miho_model
+        if miho_model.pid:
+            return miho_model
+        mixed_port = request.port
         # 如果端口未指定,查找可用端口
-        if not request.port:
-            request.port = find_free_port()
+        if not mixed_port:
+            mixed_port = find_free_port()
+            external_controller_port = find_free_port((mixed_port+1, 18000))
         config = {}
         # 保存临时配置文件
-        temp_path = settings.MIHOMO_TEMP_PATH / f"{request.provider_name}_{request.proxy_name}.yaml"
+        temp_path = settings.MIHOMO_TEMP_PATH / f"{miho_model.provider_name}_{external_controller_port}.yaml"
         # 更新端口配置
-        config['mixed-port'] = request.port
-        config['external-controller'] = f'127.0.0.1:{request.port}'
+        config['mixed-port'] = mixed_port
+        config['external-controller'] = f'127.0.0.1:{external_controller_port}'
+        config['bind-address'] = '127.0.0.1'
+        logger.info(f"sub_file.file_path {sub_file.file_path}")
+        logger.info(f"temp_path {temp_path}")
+        logger.info(f"config {config}")
+        res = update_config(Path(sub_file.file_path), config, Path(temp_path))
         
-        update_config(sub_file.file_path, config, temp_path)
-            
         # 启动进程
         try:
-            process = await start_mihomo(settings.MIHOMO_BIN_PATH, temp_path)
+            command = [str(settings.MIHOMO_BIN_PATH), "-f", str(temp_path)]
+            logger.info(f"Executing command: {' '.join(command)}")
+            
+            pid = process_manager.start_process(command, external_controller_port)
             
+            miho_model.mixed_port = mixed_port
+            miho_model.external_controller = f'127.0.0.1:{external_controller_port}'
+            miho_model.temp_file_path = str(temp_path)
+            miho_model.pid = pid
+            miho_model.running = True
+            miho_model.updated_at = datetime.now()
+            try:
+                await request_select_proxy_name(miho_model.external_controller, miho_model.provider_name, miho_model.proxy_name)
+            except Exception as e:
+                logger.error(f"Failed to select proxy: {str(e)}")
+                process_manager.stop_process(external_controller_port)
+                raise HTTPException(status_code=500, detail=str(e))
             # 更新数据库记录
-            proxy.mixed_port = request.port
-            proxy.external_controller = f'127.0.0.1:{request.port}'
-            proxy.temp_file_path = str(temp_path)
-            proxy.pid = process.pid
-            proxy.running = True
-            proxy.updated_at = datetime.now()
             
-            session.add(proxy)
+            session.add(miho_model)
             session.commit()
-            
-            return {
-                "provider_name": request.provider_name,
-                "proxy_name": request.proxy_name,
-                "port": request.port,
-                "pid": process.pid,
-                "status": "running"
-            }
+            session.refresh(miho_model)
+            mihomo_with_url = MihomoMetaWithURL(**miho_model.model_dump())
+            if miho_model.external_controller:
+                host, port = miho_model.external_controller.split(":")
+                mihomo_with_url.external_controller_url = f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies"
+            return mihomo_with_url
             
         except Exception as e:
-            logger.error(f"Failed to start mihomo: {str(e)}")
+            logger.exception(f"Failed to start mihomo: {str(e)}")
             raise HTTPException(status_code=500, detail=str(e))
 
+@mihomo_router.post("/stop")
+async def post_stop_mihomo(request: MihomoBatchRequest):
+    db = SubscriptionManager()
+    with Session(db.engine) as session:
+        selected_provider = session.exec(
+            select(MihomoMeta)
+            .where(MihomoMeta.id == request.id)
+        ).first()
+        if not selected_provider:
+            logger.error(f"Provider not found with id {request.id}")
+            raise HTTPException(status_code=404, detail="Provider not found")
+        if selected_provider.pid:
+            try:
+                process_manager.stop_process(selected_provider.external_controller)
+            except Exception as e:
+                logger.error(f"Failed to stop mihomo: {str(e)}")
+                raise HTTPException(status_code=500, detail=str(e))
+            selected_provider.pid = None
+            selected_provider.running = False
+            selected_provider.updated_at = datetime.now()
+            session.add(selected_provider)
+            session.commit()
+            session.refresh(selected_provider)
+            return selected_provider
+        else:
+            raise HTTPException(status_code=400, detail="Provider is not running")
+
 @mihomo_router.get("/")
-async def get_mihomo_running_status() -> Dict[str, List[MihomoMeta]]:
+async def get_mihomo_running_status():
     db = SubscriptionManager()
     with Session(db.engine) as session:
-        session.exec(select(MihomoMeta)).all()
+        all = session.exec(
+            select(MihomoMeta)
+            .where(MihomoMeta.pid.is_not(None))
+        ).all()
+    
+    result = []
+    for mihomo_model in all:
+        mihomo_with_url = MihomoMetaWithURL(**mihomo_model.model_dump())
+        if mihomo_model.external_controller:
+            host, port = mihomo_model.external_controller.split(":")
+            mihomo_with_url.external_controller_url = f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies"
+        result.append(mihomo_with_url)
+    
+    return result
+
+@mihomo_router.get("/external-controller")
+async def get_controller_urls():
+    running_list = await get_mihomo_running_status()
+    logger.info(f"running_list {running_list}")
+    # https://yacd.metacubex.one/?hostname=127.0.0.1&port=9351&secret=#/proxies
+    urls = []
+    for item in running_list:
+        host, port = item.external_controller.split(":")
+        urls.append(f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies")
+    return urls
 
-    return db.get_proxies_by_provider()
+async def stop_all_mihomo():
+    running_list = await get_mihomo_running_status()
+    for item in running_list:
+        if item.pid:
+            logger.info(f"stop mihomo {item}")
+            await post_stop_mihomo(MihomoBatchRequest(id=item.id))
+    # logger.info(f"running_list {running_list}")

+ 10 - 3
backend/routers/subscriptions.py

@@ -11,8 +11,8 @@ from config.logu import logger, get_logger
 from config.settings import settings
 from utils.sub import async_get_sub
 from database.engine import engine,get_session
-from sqlmodel import Session
-from database.models.subscription import SubscriptionManager,SubscriptFile
+from sqlmodel import Session, select
+from database.models.subscription import SubscriptionManager,SubscriptFile,MihomoMeta
 router = APIRouter()
 
 
@@ -91,4 +91,11 @@ async def list_subscriptions() ->List[SubscriptFile]:
     ret = []
     db = SubscriptionManager()
     db_sub_models = db.get_subscription_meta()
-    return db_sub_models
+    return db_sub_models
+
+@router.get("/proxies")
+async def list_proxies() -> Dict[str, List[MihomoMeta]]:
+    db = SubscriptionManager()
+    with Session(db.engine) as session:
+        session.exec(select(MihomoMeta)).all()
+    return db.get_proxies_by_provider()

+ 63 - 0
backend/utils/processes_mgr.py

@@ -0,0 +1,63 @@
+import multiprocessing
+import subprocess
+import os
+import signal
+from typing import List, Dict
+from config.logu import logger, get_logger
+
+class ProcessManager:
+    def __init__(self):
+        self.processes: Dict[str, multiprocessing.Process] = {}
+
+    def start_process(self, command: str, process_name: str):
+        """启动一个子进程并记录它"""
+        if process_name in self.processes and self.processes[process_name].is_alive():
+            logger.info(f"Process {process_name} is already running.")
+            return
+
+        parent_conn, child_conn = multiprocessing.Pipe()
+        p = multiprocessing.Process(target=self._run_command, args=(command, child_conn))
+        p.start()
+        pid = parent_conn.recv()  # 获取子进程的PID
+        logger.info(f"Started process {process_name} with PID: {pid}")
+        self.processes[process_name] = p
+        return pid
+    def stop_process(self, process_name: str):
+        """停止指定名称的子进程"""
+        if process_name in self.processes and self.processes[process_name].is_alive():
+            p = self.processes[process_name]
+            try:
+                os.kill(p.pid, signal.CTRL_BREAK_EVENT)  # 向Windows上的子进程发送CTRL_BREAK事件
+            except Exception as e:
+                logger.info(f"Failed to send CTRL_BREAK_EVENT to process {p.pid}: {e}")
+            p.terminate()
+            p.join()
+            del self.processes[process_name]
+            logger.info(f"Stopped process {process_name}")
+
+    def stop_all_processes(self):
+        """停止所有子进程"""
+        for process_name in list(self.processes.keys()):
+            self.stop_process(process_name)
+
+    @staticmethod
+    def _run_command(command, conn):
+        """在子进程中运行命令并将PID通过管道返回给主进程"""
+        try:
+            process = subprocess.Popen(
+                command,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE,
+                creationflags=subprocess.CREATE_NEW_PROCESS_GROUP  # 对于Windows有用
+            )
+            # 将子进程的PID发送给父进程
+            conn.send(process.pid)
+            conn.close()  # 关闭管道,避免阻塞
+            
+            # 持续运行,不调用 process.communicate()
+            process.wait()  # 等待子进程结束(如果需要)
+        except Exception as e:
+            conn.send((None, str(e)))
+            conn.close()
+
+process_manager = ProcessManager()