| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- import asyncio
- import httpx
- import sys
- from pathlib import Path
- from sqlmodel import Session, select
- sys.path.append(str(Path(__file__).parent.parent.absolute()))
- print(sys.path)
- from config.logu import get_logger
- from routers.mihomo import MihomoMetaWithURL
- from database.models.subscription import SubscriptionManager, MihomoMeta
- logger = get_logger('client',file=False)
- BASE_URL = "http://localhost:5010"
- async def get_proxies() -> dict:
- """获取所有运行中的mihomo代理"""
- url = f"{BASE_URL}/subscriptions/proxies"
- async with httpx.AsyncClient() as client:
- response = await client.get(url)
- response.raise_for_status()
- return response.json()
- async def get_mihomo_proxies() -> list:
- """获取所有运行中的mihomo代理"""
- url = f"{BASE_URL}/subscriptions/proxies"
- async with httpx.AsyncClient() as client:
- response = await client.get(url)
- response.raise_for_status()
- return response.json()
- def startup():
- url = f"{BASE_URL}/mihomo/startup"
- response = httpx.post(url,timeout=30)
- response.raise_for_status() # 确保请求成功
- return response.json()
- async def start_proxy(proxy_id: int, port: int=None):
- """启动单个代理"""
- url = f"{BASE_URL}/mihomo/start"
- payload = {
- "id": proxy_id,
- }
- if port:
- payload["port"] = port
- async with httpx.AsyncClient() as client:
- try:
- response = await client.post(url, json=payload, timeout=30)
- response.raise_for_status()
- return response.json()
- except Exception as e:
- logger.error(f"启动代理 {proxy_id} 失败: {str(e)}")
- raise
- async def proxies_reachability() -> list:
- """测试所有运行中代理的延迟并启动可用代理"""
- url = f"{BASE_URL}/mihomo/proxies_reachability"
- async with httpx.AsyncClient() as client:
- response = await client.post(url, timeout=30)
- response.raise_for_status()
- results = response.json()
- logger.info(f"results {len(results)}")
- return results
- async def start_proxies():
- # 启动所有代理
- res = startup()
- # 测试所有代理的延迟
- await proxies_reachability()
- res = await get_proxies()
- count = 10
- for key in res.keys():
- provider_proxies = res[key]
- for proxy in provider_proxies:
- proxy_id = proxy["id"]
- if proxy["delay"] is not None and proxy["delay"] < 2000:
- logger.info(f"{proxy_id} {proxy}")
- await start_proxy(proxy_id)
- count -= 1
- if count == 0:
- return
- async def get_random_proxy():
- """测试所有运行中代理的延迟并启动可用代理"""
- url = f"{BASE_URL}/get"
- async with httpx.AsyncClient() as client:
- response = await client.get(url, timeout=30)
- response.raise_for_status()
- results = response.json()
- logger.info(f"results {results}")
- port = results["port"]
- addr = f'http://127.0.0.1:{port}'
- logger.info(f"curl -i -x {addr} https://www.google.com")
- return results
- async def start_by_ids(proxy_ids: list):
- for proxy_id in proxy_ids:
- await start_proxy(proxy_id)
- async def main():
- # await start_proxies()
- # await get_random_proxy()
- await start_by_ids([26,27,28,38,39,40,41])
-
- if __name__ == "__main__":
- asyncio.run(main())
|