| 123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- import httpx
- from config.settings import settings
- from pathlib import Path
- import os
- from contextlib import closing
- from socket import socket, AF_INET, SOCK_STREAM
- def find_free_port(scope=(9350, 18000)):
- """
- 查找一个可用端口
- :param scope: 指定端口范围,为None时使用默认范围(9600-19600)
- :return: 可以使用的端口号
- """
- for port in range(*scope):
- with closing(socket(AF_INET, SOCK_STREAM)) as sock:
- try:
- # 尝试绑定端口,如果成功则说明端口空闲
- sock.bind(('127.0.0.1', port))
- return port
- except OSError:
- # 端口已被占用,继续尝试下一个
- continue
- raise OSError('未找到可用端口。')
- def port_is_using(ip, port):
- """检查端口是否被占用"""
- with closing(socket(AF_INET, SOCK_STREAM)) as sock:
- sock.settimeout(.1)
- result = sock.connect_ex((ip, int(port)))
- return result == 0
- async def download_mihomo():
- """下载mihomo可执行文件"""
- if settings.MIHOMO_BIN_PATH.exists():
- return
-
- try:
- resp = httpx.get(settings.MIHOMO_DOWNLOAD_URL, timeout=30)
- resp.raise_for_status()
- settings.MIHOMO_BIN_PATH.parent.mkdir(parents=True, exist_ok=True)
- settings.MIHOMO_BIN_PATH.write_bytes(resp.content)
- settings.MIHOMO_BIN_PATH.chmod(0o755)
- except Exception as e:
- raise RuntimeError(f"下载mihomo失败: {str(e)}")
|