| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- from datetime import datetime
- import httpx
- from pathlib import Path
- import os
- from contextlib import closing
- from socket import socket, AF_INET, SOCK_STREAM
- import asyncio
- from contextlib import closing
- from socket import AF_INET, SOCK_STREAM
- import httpx
- import yaml
- from pathlib import Path
- from typing import Optional, Dict, Any
- from fastapi import HTTPException
- def get_sub_file_info(file_path: str):
- with open(file_path, "r",encoding='utf-8') as f:
- sub_yaml = yaml.safe_load(f)
- groups = sub_yaml.get("proxy-groups", [])
- if not groups:
- raise ValueError("subscription file is not valid")
- name = groups[0].get("name", "")
- if not name:
- raise ValueError("subscription file is not valid")
- proxies = sub_yaml.get("proxies", [])
- if not proxies:
- raise ValueError("subscription file is not valid")
- fileter_proxies = []
- fileter_proxies = []
- keywords = ['流量', '套餐', '剩余', '测试']
- for proxy in proxies:
- if not any(keyword in proxy.get("name", "") for keyword in keywords):
- fileter_proxies.append(proxy)
- return name, groups,proxies
- async def async_proxy_delay(provider_name: str, external_controller: str) -> dict:
- """异步获取代理延迟
- 失败: {'message': 'get delay: all proxies timeout'}
- 成功: {'自动选择': 34, '🇦🇺澳大利亚悉尼': 159, '🇦🇺澳大利亚悉尼2': 1577,...}
- """
- url = f"http://{external_controller}/group/{provider_name}/delay?url=https%3A%2F%2Fwww.gstatic.com%2Fgenerate_204&timeout=2000"
- async with httpx.AsyncClient() as client:
- try:
- response = await client.get(url, timeout=10)
- response.raise_for_status()
- return response.json()
- except Exception as e:
- return {"error": str(e)}
- def get_provider_name(sub_file: Path|str) -> str:
- with open(sub_file, "r",encoding='utf-8') as f:
- sub_yaml = yaml.safe_load(f)
- groups = sub_yaml.get("proxy-groups", [])
- if not groups:
- return
- name = groups[0].get("name", "")
- if not name:
- return
- return name
- def get_external_controller(config_path: Path|str) -> Optional[str]:
- with open(config_path, "r", encoding="utf-8") as f:
- config = yaml.safe_load(f)
- if "external-controller" not in config:
- return None
- return config["external-controller"]
- def save_yaml_dump(config: dict, save_as: Path) -> Path:
- """保存配置文件"""
- save_as.parent.mkdir(parents=True, exist_ok=True)
-
- with open(save_as, 'w', encoding='utf-8') as f:
- yaml.dump(
- config,
- f,
- Dumper=yaml.SafeDumper,
- allow_unicode=True,
- indent=2,
- sort_keys=False
- )
- return save_as
- async def async_get_sub(sub_url: str, save_path: Path, timeout: int = 10) -> Path:
- """获取订阅文件"""
- headers = {'User-Agent': 'clash-verge/v1.7.5'}
- try:
- async with httpx.AsyncClient() as client:
- resp = await client.get(sub_url, headers=headers, follow_redirects=True, timeout=timeout)
- resp.raise_for_status()
- except httpx.HTTPError as e:
- raise HTTPException(status_code=500, detail=f"订阅获取失败: {str(e)}")
-
- save_path = Path(save_path)
- save_path.parent.mkdir(parents=True, exist_ok=True)
-
- with open(save_path, 'w', encoding='utf-8') as f:
- f.write(resp.text)
- return save_path
- async def get_sub(sub_url: str, save_path: Path, timeout: int = 10) -> Path:
- """获取订阅文件(异步版本)"""
- headers = {'User-Agent': 'clash-verge/v1.7.5'}
- try:
- async with httpx.AsyncClient() as client:
- resp = await client.get(sub_url, headers=headers, follow_redirects=True, timeout=timeout)
- resp.raise_for_status()
- except httpx.HTTPError as e:
- raise HTTPException(status_code=500, detail=f"订阅获取失败: {str(e)}")
-
- save_path = Path(save_path)
- save_path.parent.mkdir(parents=True, exist_ok=True)
-
- with open(save_path, 'w', encoding='utf-8') as f:
- f.write(resp.text)
- return save_path
- def update_config(
- read_path: Path,
- config_update: dict,
- save_as: Optional[Path] = None,
- ) -> Path:
- """更新配置文件"""
- config: Dict[str, Any] = {}
- if read_path.exists():
- with open(read_path, 'r', encoding='utf-8') as f:
- config = yaml.safe_load(f) or {}
-
- config.update(config_update)
-
- save_as = save_as or read_path
- save_yaml_dump(config, save_as)
- async def find_free_port(scope=(9350, 18000)):
- """
- 异步查找一个可用端口
- :param scope: 指定端口范围,为None时使用默认范围(9600-19600)
- :return: 可以使用的端口号
- """
- for port in range(*scope):
- try:
- # 创建异步套接字
- reader, writer = await asyncio.open_connection('127.0.0.1', port)
- writer.close()
- await writer.wait_closed()
- except (ConnectionRefusedError, OSError):
- # 端口未被占用,返回该端口
- return port
- raise OSError('未找到可用端口。')
- async def port_is_using(port, ip='127.0.0.1', timeout=1.0):
- """异步检查是否有服务在监听指定端口(添加超时控制)"""
- try:
- # 设置连接超时时间
- reader, writer = await asyncio.wait_for(
- asyncio.open_connection(ip, port),
- timeout=timeout
- )
- writer.close()
- await writer.wait_closed()
- return True
- except (ConnectionRefusedError, OSError, asyncio.TimeoutError):
- # TimeoutError 表示超时,视为无服务监听
- return False
- async def download_mihomo(download_path: Path, download_url: str):
- """下载mihomo可执行文件"""
- if download_path.exists():
- return
-
- try:
- resp = httpx.get(download_url, timeout=30)
- resp.raise_for_status()
- download_path.parent.mkdir(parents=True, exist_ok=True)
- download_path.write_bytes(resp.content)
- download_path.chmod(0o755)
- except Exception as e:
- raise RuntimeError(f"下载mihomo失败: {str(e)}")
|