Prechádzať zdrojové kódy

完善代理并发启动,并发获取 ping 值,start_all_proxies_reachability 接口暂时不可用

mrh 10 mesiacov pred
rodič
commit
dc7c165ce7
1 zmenil súbory, kde vykonal 61 pridanie a 35 odobranie
  1. 61 35
      backend/routers/mihomo.py

+ 61 - 35
backend/routers/mihomo.py

@@ -6,7 +6,7 @@ from pathlib import Path
 from fastapi import APIRouter, HTTPException
 from datetime import datetime, timedelta
 from pydantic import BaseModel
-from typing import Dict, List, Optional
+from typing import Dict, List, Optional, Union
 import httpx
 from sqlmodel import Session, select, or_,func
 import yaml
@@ -93,15 +93,15 @@ async def post_start_mihomo(request: MihomoBatchRequest) -> MihomoMetaWithURL:
         mixed_port = request.port
         if not mixed_port:
             mixed_port = await find_free_port()
-            external_controller_port = await find_free_port((mixed_port+1, 18000))
+        external_controller_port = await find_free_port((mixed_port+1, 18000))
         config = {}
         temp_path = settings.MIHOMO_TEMP_PATH / f"{miho_model.provider_name}_{external_controller_port}.yaml"
         config['mixed-port'] = mixed_port
         config['external-controller'] = f'127.0.0.1:{external_controller_port}'
         config['bind-address'] = '127.0.0.1'
-        logger.info(f"sub_file.file_path {sub_file.file_path}")
-        logger.info(f"temp_path {temp_path}")
-        logger.info(f"config {config}")
+        # logger.info(f"sub_file.file_path {sub_file.file_path}")
+        # logger.info(f"temp_path {temp_path}")
+        # logger.info(f"config {config}")
         res = update_config(Path(sub_file.file_path), config, Path(temp_path))
         
         try:
@@ -137,17 +137,37 @@ async def post_start_mihomo(request: MihomoBatchRequest) -> MihomoMetaWithURL:
             raise HTTPException(status_code=500, detail=str(e))
 
 @mihomo_router.post("/startup")
-async def post_start_each_provider() -> List[MihomoMetaWithURL]:
+async def post_start_each_provider():
     db = SubscriptionManager()
     results = db.get_each_provider_proxies()
-    ret = [] 
-    for provider_moho in results:
-        try:
-            res = await post_start_mihomo(MihomoBatchRequest(id=int(provider_moho.id)))
-            ret.append(res)
-        except Exception as e:
-            logger.exception(f"Failed to start mihomo: {str(e)}")
-            raise HTTPException(status_code=500, detail=str(e))
+    logger.info(f"{len(results)}")
+    port_start = 9350
+    
+    # 创建所有任务的列表,为每个任务分配不同的端口号
+    tasks = []
+    for idx, provider_moho in enumerate(results):
+        port = port_start + idx * 2
+        tasks.append(
+            post_start_mihomo(MihomoBatchRequest(
+                id=int(provider_moho.id),
+                port=port
+            ))
+        )
+    
+    # 并发执行所有任务,并允许任务抛出异常而不中断其他任务
+    results = await asyncio.gather(*tasks, return_exceptions=True)
+    
+    # 处理结果和错误
+    ret = []
+    for result in results:
+        if isinstance(result, Exception):
+            # 如果是异常,记录错误并添加到 ret 中
+            logger.error(f"Failed to start mihomo: {str(result)}")
+            ret.append({"error": str(result)})
+        else:
+            # 如果是正常结果,直接添加到 ret 中
+            ret.append(result)
+    
     return ret
 
 @mihomo_router.post("/stop")
@@ -193,35 +213,41 @@ async def get_mihomo_running_status():
 
 @mihomo_router.post("/proxies_reachability")
 async def get_proxies_reachability():
-    # 2. 获取所有运行中的代理
+    # 获取所有运行中的代理
     db = SubscriptionManager()
     running_proxies = db.get_each_provider_running_proxies()
     
-    # 3. 测试延迟并更新数据库
+    # 过滤掉没有 external_controller 的代理
+    valid_proxies = [proxy for proxy in running_proxies if proxy.external_controller]
+    
+    # 创建并发任务列表
+    tasks = [
+        async_proxy_delay(proxy.provider_name, proxy.external_controller)
+        for proxy in valid_proxies
+    ]
+    
+    # 并发执行所有延迟测试
+    delay_results = await asyncio.gather(*tasks, return_exceptions=True)
+    
+    # 处理结果并更新数据库
     results = []
-    for proxy in running_proxies:
-        if not proxy.external_controller:
-            continue
-            
-        try:
-            # 获取延迟数据
-            delay_data = await async_proxy_delay(proxy.provider_name, proxy.external_controller)
-            
-            db.update_proxy_delays(
-                provider_name=proxy.provider_name,
-                delays=delay_data
-            )
-            
-            # 收集结果
+    for proxy, delay_result in zip(valid_proxies, delay_results):
+        if isinstance(delay_result, Exception):
+            # 处理错误情况
+            logger.error(f"Failed to update delays for {proxy.provider_name}: {str(delay_result)}")
             results.append({
                 "provider": proxy.provider_name,
-                "delays": delay_data
+                "error": str(delay_result)
             })
-        except Exception as e:
-            logger.error(f"Failed to update delays for {proxy.provider_name}: {str(e)}")
+        else:
+            # 更新数据库并收集结果
+            db.update_proxy_delays(
+                provider_name=proxy.provider_name,
+                delays=delay_result
+            )
             results.append({
                 "provider": proxy.provider_name,
-                "error": str(e)
+                "delays": delay_result
             })
     
     return results
@@ -244,7 +270,7 @@ async def start_all_proxies_reachability():
     # 并发执行所有任务,并允许任务抛出异常而不中断其他任务
     results = await asyncio.gather(*tasks, return_exceptions=True)
     
-    # 处理结果和错误
+    # 处理结果和错误 
     ret = []
     for result in results:
         if isinstance(result, Exception):