mihomo_service.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import httpx
  2. from config.settings import settings
  3. from pathlib import Path
  4. import os
  5. from contextlib import closing
  6. from socket import socket, AF_INET, SOCK_STREAM
  7. def find_free_port(scope=(9350, 18000)):
  8. """
  9. 查找一个可用端口
  10. :param scope: 指定端口范围,为None时使用默认范围(9600-19600)
  11. :return: 可以使用的端口号
  12. """
  13. for port in range(*scope):
  14. with closing(socket(AF_INET, SOCK_STREAM)) as sock:
  15. try:
  16. # 尝试绑定端口,如果成功则说明端口空闲
  17. sock.bind(('127.0.0.1', port))
  18. return port
  19. except OSError:
  20. # 端口已被占用,继续尝试下一个
  21. continue
  22. raise OSError('未找到可用端口。')
  23. def port_is_using(ip, port):
  24. """检查端口是否被占用"""
  25. with closing(socket(AF_INET, SOCK_STREAM)) as sock:
  26. sock.settimeout(.1)
  27. result = sock.connect_ex((ip, int(port)))
  28. return result == 0
  29. async def download_mihomo():
  30. """下载mihomo可执行文件"""
  31. if settings.MIHOMO_BIN_PATH.exists():
  32. return
  33. try:
  34. resp = httpx.get(settings.MIHOMO_DOWNLOAD_URL, timeout=30)
  35. resp.raise_for_status()
  36. settings.MIHOMO_BIN_PATH.parent.mkdir(parents=True, exist_ok=True)
  37. settings.MIHOMO_BIN_PATH.write_bytes(resp.content)
  38. settings.MIHOMO_BIN_PATH.chmod(0o755)
  39. except Exception as e:
  40. raise RuntimeError(f"下载mihomo失败: {str(e)}")