from datetime import datetime import httpx from pathlib import Path import os from contextlib import closing from socket import socket, AF_INET, SOCK_STREAM import asyncio from contextlib import closing from socket import AF_INET, SOCK_STREAM import httpx import yaml from pathlib import Path from typing import Optional, Dict, Any from fastapi import HTTPException def get_sub_file_info(file_path: str): with open(file_path, "r",encoding='utf-8') as f: sub_yaml = yaml.safe_load(f) groups = sub_yaml.get("proxy-groups", []) if not groups: raise ValueError("subscription file is not valid") name = groups[0].get("name", "") if not name: raise ValueError("subscription file is not valid") proxies = sub_yaml.get("proxies", []) if not proxies: raise ValueError("subscription file is not valid") fileter_proxies = [] fileter_proxies = [] keywords = ['流量', '套餐', '剩余', '测试'] for proxy in proxies: if not any(keyword in proxy.get("name", "") for keyword in keywords): fileter_proxies.append(proxy) return name, groups,proxies async def async_proxy_delay(provider_name: str, external_controller: str) -> dict: """异步获取代理延迟 失败: {'message': 'get delay: all proxies timeout'} 成功: {'自动选择': 34, '🇦🇺澳大利亚悉尼': 159, '🇦🇺澳大利亚悉尼2': 1577,...} """ url = f"http://{external_controller}/group/{provider_name}/delay?url=https%3A%2F%2Fwww.gstatic.com%2Fgenerate_204&timeout=2000" async with httpx.AsyncClient() as client: try: response = await client.get(url, timeout=10) response.raise_for_status() return response.json() except Exception as e: return {"error": str(e)} def get_provider_name(sub_file: Path|str) -> str: with open(sub_file, "r",encoding='utf-8') as f: sub_yaml = yaml.safe_load(f) groups = sub_yaml.get("proxy-groups", []) if not groups: return name = groups[0].get("name", "") if not name: return return name def get_external_controller(config_path: Path|str) -> Optional[str]: with open(config_path, "r", encoding="utf-8") as f: config = yaml.safe_load(f) if "external-controller" not in config: return None return config["external-controller"] def save_yaml_dump(config: dict, save_as: Path) -> Path: """保存配置文件""" save_as.parent.mkdir(parents=True, exist_ok=True) with open(save_as, 'w', encoding='utf-8') as f: yaml.dump( config, f, Dumper=yaml.SafeDumper, allow_unicode=True, indent=2, sort_keys=False ) return save_as async def async_get_sub(sub_url: str, save_path: Path, timeout: int = 10) -> Path: """获取订阅文件""" headers = {'User-Agent': 'clash-verge/v1.7.5'} try: async with httpx.AsyncClient() as client: resp = await client.get(sub_url, headers=headers, follow_redirects=True, timeout=timeout) resp.raise_for_status() except httpx.HTTPError as e: raise HTTPException(status_code=500, detail=f"订阅获取失败: {str(e)}") save_path = Path(save_path) save_path.parent.mkdir(parents=True, exist_ok=True) with open(save_path, 'w', encoding='utf-8') as f: f.write(resp.text) return save_path async def get_sub(sub_url: str, save_path: Path, timeout: int = 10) -> Path: """获取订阅文件(异步版本)""" headers = {'User-Agent': 'clash-verge/v1.7.5'} try: async with httpx.AsyncClient() as client: resp = await client.get(sub_url, headers=headers, follow_redirects=True, timeout=timeout) resp.raise_for_status() except httpx.HTTPError as e: raise HTTPException(status_code=500, detail=f"订阅获取失败: {str(e)}") save_path = Path(save_path) save_path.parent.mkdir(parents=True, exist_ok=True) with open(save_path, 'w', encoding='utf-8') as f: f.write(resp.text) return save_path def update_config( read_path: Path, config_update: dict, save_as: Optional[Path] = None, ) -> Path: """更新配置文件""" config: Dict[str, Any] = {} if read_path.exists(): with open(read_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) or {} config.update(config_update) save_as = save_as or read_path save_yaml_dump(config, save_as) async def find_free_port(scope=(9350, 18000)): """ 异步查找一个可用端口 :param scope: 指定端口范围,为None时使用默认范围(9600-19600) :return: 可以使用的端口号 """ for port in range(*scope): try: # 创建异步套接字 reader, writer = await asyncio.open_connection('127.0.0.1', port) writer.close() await writer.wait_closed() except (ConnectionRefusedError, OSError): # 端口未被占用,返回该端口 return port raise OSError('未找到可用端口。') async def port_is_using(port, ip='127.0.0.1', timeout=1.0): """异步检查是否有服务在监听指定端口(添加超时控制)""" try: # 设置连接超时时间 reader, writer = await asyncio.wait_for( asyncio.open_connection(ip, port), timeout=timeout ) writer.close() await writer.wait_closed() return True except (ConnectionRefusedError, OSError, asyncio.TimeoutError): # TimeoutError 表示超时,视为无服务监听 return False async def download_mihomo(download_path: Path, download_url: str): """下载mihomo可执行文件""" if download_path.exists(): return try: resp = httpx.get(download_url, timeout=30) resp.raise_for_status() download_path.parent.mkdir(parents=True, exist_ok=True) download_path.write_bytes(resp.content) download_path.chmod(0o755) except Exception as e: raise RuntimeError(f"下载mihomo失败: {str(e)}")