import asyncio from copy import deepcopy import hashlib from pathlib import Path import random from typing import Dict, List, Optional from fastapi import APIRouter, HTTPException from pydantic import BaseModel from cachetools import TTLCache from utils.win import get_proxy_settings from utils.mihomo import get_sub,update_config,save_yaml_dump,find_free_port,port_is_using from utils.config import Config, config, PROXY_POLL_DIR, Sub, Proxy from utils.process_mgr import process_manager from utils.logu import get_logger,logger from src.services.subscription_manager import SubscriptionManager from src.services.proxy_manager import ProxyManager sub_mgr = SubscriptionManager(config=config) proxy_lock = asyncio.Lock() # 全局异步锁 router = APIRouter() cache = TTLCache(maxsize=100, ttl=360) class SysProxyResponse(BaseModel): sys_open: bool proxy_server: str class SubUrlPost(BaseModel): sub_url: str class ProxyPoolResponse(BaseModel): proxies: List[str] cached: bool from fastapi.requests import Request @router.get("/sys") async def read_root(request: Request): proxy_enable, proxy_server = get_proxy_settings() return SysProxyResponse(sys_open=proxy_enable, proxy_server=proxy_server) class ProxyResponse(BaseModel): name: str port: int reachable: bool file_path: str mgr_url: str process_info: Optional[Dict] = None ping: Optional[Dict[str, int]] = None async def get_proxy_response(port: int): porxy_model = sub_mgr.sub.proxies.get(port) proxy_mgr = sub_mgr.list_proxies_mgr.get(port) mgr_url = proxy_mgr.get_management_url() reachable = await port_is_using(port, timeout=0.5) name = porxy_model.name or '' process_info = None ping = {} if reachable: process_info = proxy_mgr.get_process_info() response = await proxy_mgr.get_now_selected_proxy() reachable = response.get("err", 1) == 0 name = response.get("name", '') if name: if porxy_model.name and porxy_model.name != name: porxy_model.name = name sub_mgr.save_config() else: name = porxy_model.name or '' # ping = await proxy_mgr.ping_proxies() # logger.info(f"{response}") result = ProxyResponse( name=name, port=porxy_model.port, reachable=reachable, file_path=porxy_model.file_path, mgr_url=mgr_url, process_info=process_info, ping=ping ) return result async def get_all_proxy_response(use_cache: bool = True) -> List[ProxyResponse]: global sub_mgr ret = [] tasks = [] for port,porxy_model in sub_mgr.sub.proxies.items(): tasks.append(get_proxy_response(port)) ret = await asyncio.gather(*tasks) return ret @router.get("/ping") async def ping_proxies() -> Dict[str, int]: global sub_mgr,cache cache_key = f"ping_result" if cache_key not in cache: try: result = await sub_mgr.ping_proxies() cache[cache_key] = result except Exception as e: logger.error(f"ping_proxies error: {e}") return {"err": 1, "msg": str(e)} else: logger.info(f"use cache: {cache_key}") return cache[cache_key] @router.get("/proxies-pool") async def get_proxies_pool(force_refresh: bool = False): global cache cache_key = "proxy_pool" if not force_refresh and cache_key in cache: return ProxyPoolResponse(proxies=cache[cache_key], cached=True) proxies = [] all_proxies = await get_all_proxy_response() for p in all_proxies: if p.reachable: # 健康检查 proxies.append(f"127.0.0.1:{p.port}") # 更新缓存并返回 cache[cache_key] = proxies return ProxyPoolResponse(proxies=proxies, cached=False) @router.get("/proxies/{port}") @router.get("/proxies") async def get_proxies(port: int = None): if port: proxy_mgr = sub_mgr.get_proxy_manager(port) if not proxy_mgr: raise HTTPException(status_code=404, detail=f"Proxy with port {port} not found") return await get_proxy_response(port) else: ret = await get_all_proxy_response() logger.info(f"{ret}") return ret class ProxyPost(BaseModel): name: Optional[str] = None port: Optional[int] = None auto: Optional[bool] = False class ProxyPostResponse(BaseModel): err: int = 1 msg: str = '' data: ProxyResponse async def auto_select_proxy(port: int): global sub_mgr ping_res = await ping_proxies() name = random.choice(list(ping_res.keys())) # sub_mgr.list_proxies_mgr.get(port).get_management_url() await sub_mgr.select_proxy(port, name) @router.delete("/proxies/{port}") async def delete_proxy(port: int): global sub_mgr try: proxy_mgr = sub_mgr.get_proxy_manager(port) if not proxy_mgr: raise HTTPException(status_code=404, detail=f"Proxy with port {port} not found") await sub_mgr.stop_proxy(port) if port in sub_mgr.sub.proxies: del sub_mgr.sub.proxies[port] sub_mgr.save_config() # 清除代理池缓存 if 'proxy_pool' in cache: del cache['proxy_pool'] return await get_all_proxy_response() except Exception as e: logger.error(f"Failed to delete proxy {port}: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/proxies") async def create_proxy(request:ProxyPost): global sub_mgr,proxy_lock logger.info(f"request: {request}") proxy_mgr = None async with proxy_lock: if request.auto: porxy_port = await find_free_port((sub_mgr.sub.start_port, sub_mgr.sub.start_port + 10000)) controler_port = await find_free_port((porxy_port + 1, porxy_port + 10001)) elif request.port: porxy_port = request.port proxy_mgr = sub_mgr.get_proxy_manager(porxy_port) if proxy_mgr and proxy_mgr.running: return ProxyPostResponse(err=0, msg=f"已开启,跳过 {porxy_port} ", data=await get_proxy_response(porxy_port)) porxy_port_is_using = await port_is_using(porxy_port) controler_port = request.port + 1 if porxy_port_is_using: raise HTTPException(status_code=400, detail=ProxyPostResponse(err=1, msg=f"porxy_port={porxy_port} 端口已被占用")) if await port_is_using(controler_port): raise HTTPException(status_code=400, detail=ProxyPostResponse(err=1, msg=f"controler_port={controler_port} 端口已被占用")) else: raise HTTPException(status_code=400, detail=ProxyPostResponse(err=1, msg="port 或 auto 必须有一个")) await sub_mgr.create_custom_config(porxy_port, controler_port) await sub_mgr.start_proxy(porxy_port) await auto_select_proxy(porxy_port) res = ProxyPostResponse(err=0, msg="ok", data=await get_proxy_response(porxy_port)) logger.info(f"{res}") # 清除代理池缓存 if 'proxy_pool' in cache: del cache['proxy_pool'] return res return HTTPException(status_code=500, detail=ProxyPostResponse(err=1, msg="proxy_lock error", data=sub_mgr.sub)) @router.post("/proxies/{port}/stop") async def stop_proxy(port: int): global sub_mgr proxy_mgr = sub_mgr.get_proxy_manager(port) if not proxy_mgr: raise HTTPException(status_code=404, detail=f"Proxy with port {port} not found") await sub_mgr.stop_proxy(port) # 清除代理池缓存 if 'proxy_pool' in cache: del cache['proxy_pool'] return await get_proxy_response(port) @router.get("/subs") async def get_subscriptions(): global sub_mgr return {"err": 0, "data": sub_mgr.sub} @router.post("/subs") async def create_subscription(request: SubUrlPost): try: global sub_mgr subscription = await sub_mgr.download_subscription(request.sub_url) return {"err": 0, "data": subscription.sub} except Exception as e: return {"err": 1, "msg": str(e)} class StartupRequest(BaseModel): auto_start: bool @router.post("/startup") async def startup(request: StartupRequest): global sub_mgr,config sub_mgr.save_startup(request.auto_start) # 清除代理池缓存 if 'proxy_pool' in cache: del cache['proxy_pool'] return {"err": 0, "msg": "ok", "data": config} def main(): # 获取代理设置 proxy_enable, proxy_server = get_proxy_settings() if proxy_enable: print(f"代理已启用,代理服务器地址和端口: {proxy_server}") else: print("代理未启用") if __name__ == "__main__": main()