import asyncio import httpx import sys from pathlib import Path from sqlmodel import Session, select sys.path.append(str(Path(__file__).parent.parent.absolute())) print(sys.path) from config.logu import get_logger from routers.mihomo import MihomoMetaWithURL from database.models.subscription import SubscriptionManager, MihomoMeta logger = get_logger('client',file=False) BASE_URL = "http://localhost:5010" async def get_proxies() -> dict: """获取所有运行中的mihomo代理""" url = f"{BASE_URL}/subscriptions/proxies" async with httpx.AsyncClient() as client: response = await client.get(url) response.raise_for_status() return response.json() async def get_mihomo_proxies() -> list: """获取所有运行中的mihomo代理""" url = f"{BASE_URL}/subscriptions/proxies" async with httpx.AsyncClient() as client: response = await client.get(url) response.raise_for_status() return response.json() def startup(): url = f"{BASE_URL}/mihomo/startup" response = httpx.post(url,timeout=30) response.raise_for_status() # 确保请求成功 return response.json() async def start_proxy(proxy_id: int, port: int=None): """启动单个代理""" url = f"{BASE_URL}/mihomo/start" payload = { "id": proxy_id, } if port: payload["port"] = port async with httpx.AsyncClient() as client: try: response = await client.post(url, json=payload, timeout=30) response.raise_for_status() return response.json() except Exception as e: logger.error(f"启动代理 {proxy_id} 失败: {str(e)}") raise async def proxies_reachability() -> list: """测试所有运行中代理的延迟并启动可用代理""" url = f"{BASE_URL}/mihomo/proxies_reachability" async with httpx.AsyncClient() as client: response = await client.post(url, timeout=30) response.raise_for_status() results = response.json() logger.info(f"results {len(results)}") return results async def start_proxies(): # 启动所有代理 res = startup() # 测试所有代理的延迟 await proxies_reachability() res = await get_proxies() count = 10 for key in res.keys(): provider_proxies = res[key] for proxy in provider_proxies: proxy_id = proxy["id"] if proxy["delay"] is not None and proxy["delay"] < 2000: logger.info(f"{proxy_id} {proxy}") await start_proxy(proxy_id) count -= 1 if count == 0: return async def get_random_proxy(): """测试所有运行中代理的延迟并启动可用代理""" url = f"{BASE_URL}/get" async with httpx.AsyncClient() as client: response = await client.get(url, timeout=30) response.raise_for_status() results = response.json() logger.info(f"results {results}") port = results["port"] addr = f'http://127.0.0.1:{port}' logger.info(f"curl -i -x {addr} https://www.google.com") return results async def start_by_ids(proxy_ids: list): for proxy_id in proxy_ids: await start_proxy(proxy_id) async def main(): # await start_proxies() # await get_random_proxy() await start_by_ids([26,27,28,38,39,40,41]) if __name__ == "__main__": asyncio.run(main())