Browse Source

导入配置时自动加载模型对象;完善 proxy_mgr 的管理;

mrh 9 months ago
parent
commit
c148360193

+ 0 - 1
ui/backend/.gitignore

@@ -1 +0,0 @@
-t.py

+ 4 - 0
ui/backend/config.yaml

@@ -9,6 +9,10 @@ sub:
       file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9660.yaml
       name: null
       port: 9660
+    9662:
+      file_path: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp\9662.yaml
+      name: null
+      port: 9662
   start_port: 9660
   temp_dir: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\temp
   url: https://www.yfjc.xyz/api/v1/client/subscribe?token=b74f2207492053926f7511a8e474048f

+ 3 - 3
ui/backend/routers/proxy.py

@@ -58,12 +58,12 @@ def create_proxy(request):
 def start_mimo(exe_path: str, file_path: str):
     global process_manager
     command = [exe_path, "-f", file_path]
-    pid = process_manager.start_process(command)
+    pid = process_manager.start_process(f"mimo_{Path(file_path).stem}", command)
     return pid
 
-def stop_mimo():
+def stop_mimo(file_path: str):
     global process_manager
-    process_manager.stop_process("mimo")
+    return process_manager.stop_process(f"mimo_{Path(file_path).stem}")
 
 
 @router.get("/subs")

+ 61 - 44
ui/backend/src/services/proxy_manager.py

@@ -1,57 +1,74 @@
 from pathlib import Path
+import random
 from typing import Optional
+
+import httpx
 from utils.process_mgr import process_manager
+from utils.mihomo import get_external_controller, get_provider_name, get_sub_file_info, async_get_sub, async_proxy_delay
 import yaml
 
 class ProxyManager:
-    """管理代理进程的生命周期,包括启动、停止和重启"""
-    
-    def start_proxy(self, exe_path: str, config_path: str) -> int:
-        """
-        启动代理进程
-        Args:
-            exe_path (str): 代理可执行文件路径
-            config_path (str): 配置文件路径
-        Returns:
-            int: 进程ID
-        """
-        command = [exe_path, "-f", config_path]
-        return process_manager.start_process(command, f"mimo_{Path(config_path).stem}")
+    prefix = "mimo_"
 
-    def stop_proxy(self, config_path: str):
-        """
-        停止代理进程
-        Args:
-            config_path (str): 配置文件路径
-        """
-        process_manager.stop_process(f"mimo_{Path(config_path).stem}")
+    def __init__(
+        self,
+        exe_path: str,
+        config_path: str,
+        process_manager=process_manager
+    ):
+        self.exe_path = Path(exe_path)
+        self.config_path = Path(config_path)
+        self.process_manager = process_manager
+        self.process_name = f"{self.prefix}{self.config_path.stem}"
+        self.external_controller = get_external_controller(self.config_path)
+        self.provider_name = get_provider_name(self.config_path)
 
-    def restart_proxy(self, exe_path: str, config_path: str) -> int:
-        """
-        重启代理进程
-        Args:
-            exe_path (str): 代理可执行文件路径
-            config_path (str): 配置文件路径
-        Returns:
-            int: 新的进程ID
+    async def start_proxy(self) -> int:
+        """启动代理进程"""
+        command = [str(self.exe_path), "-f", str(self.config_path)]
+        return await self.process_manager.start_process(self.process_name, command)
+
+    async def stop_proxy(self):
+        """停止代理进程"""
+        return await self.process_manager.stop_process(self.process_name)
+
+    async def restart_proxy(self) -> int:
+        """重启代理进程"""
+        await self.stop_proxy()
+        return await self.start_proxy()
+
+    def get_management_url(self) -> Optional[str]:
+            
+        host, port = self.external_controller.split(":")
+        return f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies"
+
+    async def ping_proxies(self):
+        """异步获取代理延迟
+        失败: {'message': 'get delay: all proxies timeout'}
+        成功: {'自动选择': 34, '🇦🇺澳大利亚悉尼': 159, '🇦🇺澳大利亚悉尼2': 1577,...}
         """
-        self.stop_proxy(config_path)
-        return self.start_proxy(exe_path, config_path)
+        url = f"http://{self.external_controller}/group/{self.provider_name}/delay?url=https%3A%2F%2Fwww.gstatic.com%2Fgenerate_204&timeout=2000"
+        async with httpx.AsyncClient() as client:
+            response = await client.get(url, timeout=10)
+            response.raise_for_status()
+            return response.json()
 
-    def get_management_url(self, config_path: Path) -> Optional[str]:
+    async def select_proxy(self, name: str='') -> int:
         """
-        获取代理管理页面URL
-        Args:
-            config_path (Path): 配置文件路径
-        Returns:
-            Optional[str]: 管理页面URL,如果配置无效则返回None
+        选择代理,如果 name 为空,则随机选择一个代理
         """
-        with open(config_path, "r", encoding="utf-8") as f:
-            config = yaml.safe_load(f)
+        if not name:
+            proxy = await self.ping_proxies()
+            name = random.choice(list(proxy.keys()))
+        url = f"http://{self.external_controller}/proxies/{self.provider_name}"
+        payload = {"name": name}
         
-        if "external-controller" not in config:
-            return None
-            
-        host, port = config["external-controller"].split(":")
-        return f"https://yacd.metacubex.one/?hostname={host}&port={port}&secret=#/proxies"
-
+        async with httpx.AsyncClient() as client:
+            response = await client.put(url, json=payload)
+            response.raise_for_status()
+            return response.json()
+    @classmethod
+    def get_all_running_proxies(cls) -> list:
+        """获取所有正在运行的代理进程"""
+        return [p for p in process_manager.processes.values() 
+               if p["name"].startswith(cls.prefix)]

+ 26 - 4
ui/backend/src/services/subscription_manager.py

@@ -14,7 +14,9 @@ class SubscriptionManager:
     def __init__(self, config: Config=config):
         self.sub = config.sub
         self.config = config
-        self.proxies_mgr = ProxyManager()
+        self.list_proxies_mgr: Dict[int, ProxyManager] = {}
+        for proxy in self.sub.proxies.values():
+            self.list_proxies_mgr[proxy.port] = ProxyManager(self.config.mimo_exe, proxy.file_path)
     def save_config(self):
         """保存配置到文件"""
         return self.config.save()
@@ -51,21 +53,41 @@ class SubscriptionManager:
         config['bind-address'] = '127.0.0.1'
         res = update_config(Path(self.sub.file), config, Path(temp_path))
         self.sub.proxies.update({port: Proxy(file_path=str(temp_path), port=port)})
+        print("create_custom_config ", self.sub.proxies)
+        self.list_proxies_mgr[port] = ProxyManager(self.config.mimo_exe, temp_path)
         return self.save_config()
     
     def remove_custom_config(self, port: int):
         """
         删除自定义配置文件
         """
-        if port in self.sub.proxies:
+        proxy:Proxy = self.sub.proxies.get(port)
+        if proxy:
+            Path(proxy.file_path).unlink(missing_ok=True)
             del self.sub.proxies[port]
+            self.list_proxies_mgr[port].stop_proxy()
+            del self.list_proxies_mgr[port]
         return self.save_config()
     
-    def start_proxy(self, port: int):
+    async def start_proxy(self, port: int):
         """
         启动代理进程
         """
         proxy:Proxy = self.sub.proxies.get(port)
         if not proxy:
             return False
-        return self.proxies_mgr.start_proxy(self.config.mimo_exe, proxy.file_path)
+        self.list_proxies_mgr[port] = ProxyManager(self.config.mimo_exe, proxy.file_path)
+        return await self.list_proxies_mgr[port].start_proxy()
+    
+    async def stop_proxy(self, port: int):
+        """
+        停止代理进程
+        """
+        proxy:Proxy = self.sub.proxies.get(port)
+        if not proxy:
+            return False
+        self.list_proxies_mgr[port] = ProxyManager(self.config.mimo_exe, proxy.file_path)
+        return await self.list_proxies_mgr[port].stop_proxy()
+
+    async def ping_proxies(self, port: int):
+        return await self.list_proxies_mgr.get(port).ping_proxies()

+ 26 - 0
ui/backend/tests/mytests/t_sub_mgr.py

@@ -0,0 +1,26 @@
+from pathlib import Path
+import sys
+# 为了避免耦合,微服务,可能确实要将上级的上级目录作为一个单独的进程来处理,此目录作为一个单独的UI项目
+sys.path.append(str(Path(r'G:\code\upwork\zhang_crawl_bio\ui\backend')))
+from src.services.subscription_manager import SubscriptionManager
+from utils.config import config
+import asyncio
+from utils.logu import get_logger
+logger = get_logger('mytests', file=True)
+
+async def main():
+    logger.info(f"config: {config}")
+    sub_manager = SubscriptionManager(config)
+    # await sub_manager.download_subscription()
+    # await sub_manager.create_custom_config(port=9660)
+    # await sub_manager.create_custom_config(port=9662)
+    # logger.info(f"config: {config}")
+    print(sub_manager.sub)
+    await sub_manager.start_proxy(9660)
+    logger.info(f"{await sub_manager.ping_proxies(9660)}")
+    logger.info(f"{await sub_manager.ping_proxies(9662)}")
+    while True:
+        await asyncio.sleep(1)
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 8 - 2
ui/backend/utils/config.py

@@ -2,6 +2,7 @@ import yaml
 from pathlib import Path
 from pydantic import BaseModel
 from typing import List, Dict, Union,Optional,Any
+from utils.pydantic_auto_field import AutoLoadModel
 APP_PATH = Path(__file__).parent.parent
 CONFIG_PATH = APP_PATH / "config.yaml"
 REPO_BASE_DIR = Path(APP_PATH.parent.parent)
@@ -14,12 +15,17 @@ class Proxy(BaseModel):
     file_path: Optional[str] = None
 
 
-class Sub(BaseModel):
+class Sub(AutoLoadModel):
     url: Optional[str] = None
     start_port: Optional[int] = 9660  # Changed to int
     file: Optional[str] = None
     temp_dir: Optional[str] = str(PROXY_POLL_DIR / "temp")   
-    proxies: Optional[Dict[str, Any]] = {}
+    proxies: Optional[Dict[Union[int,str], Proxy]] = {}
+    def __init__(self, **data):
+            super().__init__(**data)
+            # Convert proxies dictionary values to Proxy objects
+            if self.proxies:
+                self.proxies = {k: Proxy(**v) if isinstance(v, dict) else v for k, v in self.proxies.items()}
 
 class Config(BaseModel):
     sub: Optional[Sub] = Sub()

+ 29 - 0
ui/backend/utils/mihomo.py

@@ -33,7 +33,36 @@ def get_sub_file_info(file_path: str):
             fileter_proxies.append(proxy)
     return name, groups,proxies
 
+async def async_proxy_delay(provider_name: str, external_controller: str) -> dict:
+    """异步获取代理延迟
+    失败: {'message': 'get delay: all proxies timeout'}
+    成功: {'自动选择': 34, '🇦🇺澳大利亚悉尼': 159, '🇦🇺澳大利亚悉尼2': 1577,...}
+    """
+    url = f"http://{external_controller}/group/{provider_name}/delay?url=https%3A%2F%2Fwww.gstatic.com%2Fgenerate_204&timeout=2000"
+    async with httpx.AsyncClient() as client:
+        try:
+            response = await client.get(url, timeout=10)
+            response.raise_for_status()
+            return response.json()
+        except Exception as e:
+            return {"error": str(e)}
+def get_provider_name(sub_file: Path|str) -> str:
+    with open(sub_file, "r",encoding='utf-8') as f:
+        sub_yaml = yaml.safe_load(f)
+    groups = sub_yaml.get("proxy-groups", [])
+    if not groups:
+        return
+    name = groups[0].get("name", "")
+    if not name:
+        return
+    return name
+def get_external_controller(config_path: Path|str) -> Optional[str]:
+    with open(config_path, "r", encoding="utf-8") as f:
+        config = yaml.safe_load(f)
 
+    if "external-controller" not in config:
+        return None
+    return config["external-controller"]
 
 def save_yaml_dump(config: dict, save_as: Path) -> Path:
     """保存配置文件"""

+ 1 - 0
ui/backend/utils/process_mgr.py

@@ -222,4 +222,5 @@ class ProcessManager:
             except RuntimeError:
                 pass
 
+process_manager = ProcessManager()
 

+ 47 - 0
ui/backend/utils/pydantic_auto_field.py

@@ -0,0 +1,47 @@
+from typing import Type, TypeVar, Dict, Any, Union, Optional
+from pydantic import BaseModel, model_validator
+
+# 定义一个通用类型变量,用于表示任意 Pydantic 模型
+ModelType = TypeVar("ModelType", bound=BaseModel)
+
+class ModelField:
+    """
+    通用的字段类型,用于将字典自动转换为指定的 Pydantic 模型对象。
+    """
+    def __init__(self, model_class: Type[ModelType]):
+        self.model_class = model_class
+
+    def __call__(self, value: Any) -> ModelType:
+        if isinstance(value, dict):
+            return self.model_class(**value)
+        elif isinstance(value, self.model_class):
+            return value
+        else:
+            raise ValueError(f"Expected dict or {self.model_class}, got {type(value)}")
+
+class AutoLoadModel(BaseModel):
+    """
+    基类,用于自动加载嵌套的 Pydantic 模型对象。
+    """
+    @model_validator(mode='before')
+    def auto_load_nested_models(cls, values: Dict[str, Any]) -> Dict[str, Any]:
+        for field_name, field in cls.model_fields.items():
+            field_type = field.annotation
+            if hasattr(field_type, "__origin__") and field_type.__origin__ is Union:
+                # 处理 Union 类型(如 Optional)
+                field_type = next(t for t in field_type.__args__ if t is not type(None))
+            if isinstance(field_type, type) and issubclass(field_type, BaseModel):
+                # 如果字段是 Pydantic 模型类型,则递归处理
+                field_value = values.get(field_name)
+                if isinstance(field_value, dict):
+                    values[field_name] = field_type(**field_value)
+                elif isinstance(field_value, list):
+                    values[field_name] = [field_type(**item) if isinstance(item, dict) else item for item in field_value]
+            elif isinstance(field_type, dict) and hasattr(field_type, "get") and callable(field_type.get):
+                # 处理 Dict 类型,检查值是否为 Pydantic 模型
+                field_value = values.get(field_name)
+                if isinstance(field_value, dict):
+                    for key, value in field_value.items():
+                        if isinstance(value, dict):
+                            values[field_name][key] = field_type(value)
+        return values