浏览代码

完成 client 启动和随机获取代理

mrh 10 月之前
父节点
当前提交
09b8f2b3ed
共有 5 个文件被更改,包括 146 次插入47 次删除
  1. 1 1
      CONVENTIONS.md
  2. 17 8
      backend/database/models/subscription.py
  3. 76 18
      backend/demo/client.py
  4. 14 2
      backend/main.py
  5. 38 18
      backend/routers/mihomo.py

+ 1 - 1
CONVENTIONS.md

@@ -21,7 +21,7 @@
 
 - 当前环境是 python 3.12 ,务必要保持最新的接口来开发,例如 Fastapi 不再使用 app.event ,而是使用 lifespan 。pydantic.BaseModel 不再支持 dict() ,而是用 model_dump()
 
-重要:由于你是在 aider 开发环境中,如果你要编写任何文件的代码,都不能省略已有代码,必须完整写完
+- 必须使用搜索替换功能来编辑代码
 
 # 项目说明:
 - 这是一个基于 Fastapi + Vue3 的代理池管理。通过订阅链接,获取服务商提供的代理池数据,让 mihomo 工具启动本地代理池。

+ 17 - 8
backend/database/models/subscription.py

@@ -1,6 +1,6 @@
 from datetime import datetime
 from typing import Dict, List, Optional
-from sqlmodel import SQLModel, Field,Session,select,Relationship,func
+from sqlmodel import SQLModel, Field,Session,select,Relationship,func,update
 from sqlalchemy.engine import Engine
 from sqlalchemy.dialects.postgresql import JSON
 from pydantic import BaseModel
@@ -95,15 +95,24 @@ class SubscriptionManager:
     def update_proxy_delays(self, provider_name: str, delays: dict):
         """更新指定服务商下所有代理的延迟"""
         with Session(self.engine) as session:
-            proxies = session.exec(
-                select(MihomoMeta)
+            # 先清空该provider下所有代理的延迟值
+            session.exec(
+                update(MihomoMeta)
                 .where(MihomoMeta.provider_name == provider_name)
-            ).all()
+                .values(delay=None)
+            )
             
-            for proxy in proxies:
-                if proxy.proxy_name in delays:
-                    proxy.delay = delays[proxy.proxy_name]
-                    session.add(proxy)
+            # 更新有延迟值的代理
+            for proxy_name, delay in delays.items():
+                if isinstance(delay, int):  # 确保只更新有效的延迟值
+                    session.exec(
+                        update(MihomoMeta)
+                        .where(
+                            (MihomoMeta.provider_name == provider_name) &
+                            (MihomoMeta.proxy_name == proxy_name)
+                        )
+                        .values(delay=delay)
+                    )
             
             session.commit()
 

+ 76 - 18
backend/demo/client.py

@@ -2,17 +2,30 @@ import asyncio
 import httpx
 import sys
 from pathlib import Path
+from sqlmodel import Session, select
 sys.path.append(str(Path(__file__).parent.parent.absolute()))
 print(sys.path)
 from config.logu import get_logger
 from routers.mihomo import MihomoMetaWithURL
+from database.models.subscription import SubscriptionManager, MihomoMeta
 logger = get_logger('client',file=False)
 BASE_URL = "http://localhost:5010"
 
-def get_proxies() -> dict:
+async def get_proxies() -> dict:
+    """获取所有运行中的mihomo代理"""
     url = f"{BASE_URL}/subscriptions/proxies"
-    response = httpx.get(url)
-    return response.json()
+    async with httpx.AsyncClient() as client:
+        response = await client.get(url)
+        response.raise_for_status()
+        return response.json()
+
+async def get_mihomo_proxies() -> list:
+    """获取所有运行中的mihomo代理"""
+    url = f"{BASE_URL}/subscriptions/proxies"
+    async with httpx.AsyncClient() as client:
+        response = await client.get(url)
+        response.raise_for_status()
+        return response.json()
 
 def startup():
     url = f"{BASE_URL}/mihomo/startup"
@@ -20,21 +33,66 @@ def startup():
     response.raise_for_status()  # 确保请求成功
     return response.json()
 
-def proxy_delay(provider_name, external_controller: str):
-    '''
-    失败: {'message': 'get delay: all proxies timeout'}
-    成功: {'自动选择': 34, '🇦🇺澳大利亚悉尼': 159, '🇦🇺澳大利亚悉尼2': 1577,...}
-    '''
-    # http://127.0.0.1:9351/group/FSCloud/delay?url=https%3A%2F%2Fwww.gstatic.com%2Fgenerate_204&timeout=2000
-    url = f"http://{external_controller}/group/{provider_name}/delay?url=https%3A%2F%2Fwww.gstatic.com%2Fgenerate_204&timeout=2000"
-    response = httpx.get(url,timeout=30)
-    return response.json()
+async def start_proxy(proxy_id: int, port: int=None):
+    """启动单个代理"""
+    url = f"{BASE_URL}/mihomo/start"
+    payload = {
+        "id": proxy_id,
+    }
+    if port:
+        payload["port"] = port
+    async with httpx.AsyncClient() as client:
+        try:
+            response = await client.post(url, json=payload, timeout=30)
+            response.raise_for_status()
+            return response.json()
+        except Exception as e:
+            logger.error(f"启动代理 {proxy_id} 失败: {str(e)}")
+            raise
+
+async def proxies_reachability() -> list:
+    """测试所有运行中代理的延迟并启动可用代理"""
+    url = f"{BASE_URL}/mihomo/proxies_reachability"
+    async with httpx.AsyncClient() as client:
+        response = await client.post(url, timeout=30)
+        response.raise_for_status()
+        results = response.json()
+        logger.info(f"results {len(results)}")
+        return results
 
-def main():
+async def start_proxies():
+    # 启动所有代理
     res = startup()
-    for item in res:
-        logger.info(f"服务商: {item}")
-        res = proxy_delay(item.get('provider_name'),item.get('external_controller'))
-        logger.info(f"延迟: {res}")
+    # 测试所有代理的延迟
+    await proxies_reachability()
+    res = await get_proxies()
+    count = 10
+    for key in res.keys():
+        provider_proxies =  res[key]
+        for proxy in provider_proxies:
+            proxy_id = proxy["id"]
+            if proxy["delay"] is not None and proxy["delay"] < 2000:
+                logger.info(f"{proxy_id} {proxy}")
+                await start_proxy(proxy_id)
+                count -= 1
+                if count == 0:
+                    return
+
+async def get_random_proxy():
+    """测试所有运行中代理的延迟并启动可用代理"""
+    url = f"{BASE_URL}/get"
+    async with httpx.AsyncClient() as client:
+        response = await client.get(url, timeout=30)
+        response.raise_for_status()
+        results = response.json()
+        logger.info(f"results {results}")
+        port = results["port"]
+        addr = f'http://127.0.0.1:{port}'
+        logger.info(f"curl -i -x {addr} https://www.google.com")
+        return results
+async def main():
+    await start_proxies()
+    await get_random_proxy()
+    
 if __name__ == "__main__":
-    main()
+    asyncio.run(main())

+ 14 - 2
backend/main.py

@@ -1,3 +1,4 @@
+import random
 from fastapi import FastAPI
 from contextlib import asynccontextmanager
 import uvicorn
@@ -10,7 +11,7 @@ import asyncio
 import httpx
 from config.settings import settings
 from routers.subscriptions import router
-from routers.mihomo import mihomo_router,stop_all_mihomo
+from routers.mihomo import mihomo_router,stop_all_mihomo,get_mihomo_running_status
 from utils.mihomo_service import download_mihomo
 from database.engine import create_db_and_tables
 from aiomultiprocess import Pool
@@ -51,5 +52,16 @@ async def root():
         }
     }
 
+@app.get("/get")
+async def get_random_proxy():
+    results = await get_mihomo_running_status()
+    if not results:
+        return {"message": "No running mihomo instances", 'code': 404}
+    active_results = []
+    for result in results:
+        if result.delay is not None and result.delay < 2000:
+            active_results.append(result.mixed_port) 
+    return {"port": random.choice(active_results)}
+
 if __name__ == "__main__":
-    uvicorn.run("main:app", host=settings.HOST, port=settings.PORT, reload=settings.DEBUG)
+    uvicorn.run("main:app", host=settings.HOST, port=settings.PORT, reload=False)

+ 38 - 18
backend/routers/mihomo.py

@@ -8,7 +8,7 @@ from datetime import datetime, timedelta
 from pydantic import BaseModel
 from typing import Dict, List, Optional, Union
 import httpx
-from sqlmodel import Session, select, or_,func
+from sqlmodel import Session, select, or_,func, update
 import yaml
 import signal
 from asyncio import subprocess
@@ -43,9 +43,16 @@ class MihomoResponse(MihomoBatchRequest):
 class MihomoMetaWithURL(MihomoMeta, table=False):
     subscript_file: Optional[int] = None
     external_controller_url: Optional[str] = None
-
+    error: Optional[str] = None  # 新增错误信息字段
+    detail: Optional[dict] = None
     class Config:
         arbitrary_types_allowed = True
+
+class ProxiesReachabilityResponse(BaseModel):
+    provider_name: str
+    delays: Optional[Dict[str, int]] = None
+    error: Optional[str] = None
+    external_controller_url: Optional[str] = None
         
 async def request_select_proxy_name(external_ctl: str, provider_name: str, proxy_name: str, max_retries: int = 5, delay: float = 2.0) -> Optional[dict]:
     url = f"http://{external_ctl}/proxies/{provider_name}"
@@ -64,7 +71,10 @@ async def request_select_proxy_name(external_ctl: str, provider_name: str, proxy
                     raise HTTPException(status_code=500, detail=f"Failed to select proxy after {max_retries} attempts: {str(e)}")
 
 async def async_proxy_delay(provider_name: str, external_controller: str) -> dict:
-    """异步获取代理延迟"""
+    """异步获取代理延迟
+    失败: {'message': 'get delay: all proxies timeout'}
+    成功: {'自动选择': 34, '🇦🇺澳大利亚悉尼': 159, '🇦🇺澳大利亚悉尼2': 1577,...}
+    """
     url = f"http://{external_controller}/group/{provider_name}/delay?url=https%3A%2F%2Fwww.gstatic.com%2Fgenerate_204&timeout=2000"
     async with httpx.AsyncClient() as client:
         try:
@@ -75,7 +85,7 @@ async def async_proxy_delay(provider_name: str, external_controller: str) -> dic
             return {"error": str(e)}
 
 @mihomo_router.post("/start")
-async def post_start_mihomo(request: MihomoBatchRequest) -> MihomoMetaWithURL:
+async def post_start_mihomo(request: MihomoBatchRequest) -> MihomoMetaWithURL :
     db = SubscriptionManager()
     logger.info(f"{request}")
     with Session(db.engine) as session:
@@ -85,7 +95,7 @@ async def post_start_mihomo(request: MihomoBatchRequest) -> MihomoMetaWithURL:
         ).first()
         
         if not miho_model:
-            raise HTTPException(status_code=404, detail="Provider not found")
+            return MihomoMetaWithURL(error=1, detail="mihomo not found")
         sub_file = miho_model.subscript_file
             
         if miho_model.pid:
@@ -121,7 +131,7 @@ async def post_start_mihomo(request: MihomoBatchRequest) -> MihomoMetaWithURL:
             except Exception as e:
                 logger.error(f"Failed to select proxy: {str(e)}")
                 process_manager.stop_process(external_controller_port)
-                raise HTTPException(status_code=500, detail=str(e))
+                return MihomoMetaWithURL(error=1, detail=str(e))
             
             session.add(miho_model)
             session.commit()
@@ -134,7 +144,7 @@ async def post_start_mihomo(request: MihomoBatchRequest) -> MihomoMetaWithURL:
             
         except Exception as e:
             logger.exception(f"Failed to start mihomo: {str(e)}")
-            raise HTTPException(status_code=500, detail=str(e))
+            return MihomoMetaWithURL(error=1, detail=str(e))
 
 @mihomo_router.post("/startup")
 async def post_start_each_provider():
@@ -198,7 +208,7 @@ async def post_stop_mihomo(request: MihomoBatchRequest):
             raise HTTPException(status_code=400, detail="Provider is not running")
 
 @mihomo_router.get("/")
-async def get_mihomo_running_status():
+async def get_mihomo_running_status() -> List[MihomoMetaWithURL]:
     db = SubscriptionManager()
     all = db.get_running_proxies()
     result = []
@@ -211,10 +221,19 @@ async def get_mihomo_running_status():
     
     return result
 
-@mihomo_router.post("/proxies_reachability")
+@mihomo_router.post("/proxies_reachability", response_model=List[ProxiesReachabilityResponse])
 async def get_proxies_reachability():
-    # 获取所有运行中的代理
     db = SubscriptionManager()
+    
+    # 清空所有代理的延迟值
+    with Session(db.engine) as session:
+        session.exec(
+            update(MihomoMeta)
+            .values(delay=None)
+        )
+        session.commit()
+    
+    # 获取所有运行中的代理
     running_proxies = db.get_each_provider_running_proxies()
     
     # 过滤掉没有 external_controller 的代理
@@ -232,23 +251,24 @@ async def get_proxies_reachability():
     # 处理结果并更新数据库
     results = []
     for proxy, delay_result in zip(valid_proxies, delay_results):
+        response = ProxiesReachabilityResponse(
+            provider_name=proxy.provider_name,
+            external_controller_url=f"https://yacd.metacubex.one/?hostname=127.0.0.1&port={proxy.external_controller.split(':')[1]}&secret=#/proxies"
+        )
+        
         if isinstance(delay_result, Exception):
             # 处理错误情况
             logger.error(f"Failed to update delays for {proxy.provider_name}: {str(delay_result)}")
-            results.append({
-                "provider": proxy.provider_name,
-                "error": str(delay_result)
-            })
+            response.error = str(delay_result)
         else:
             # 更新数据库并收集结果
             db.update_proxy_delays(
                 provider_name=proxy.provider_name,
                 delays=delay_result
             )
-            results.append({
-                "provider": proxy.provider_name,
-                "delays": delay_result
-            })
+            response.delays = delay_result
+        
+        results.append(response)
     
     return results