| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- import asyncio
- from copy import deepcopy
- import hashlib
- from pathlib import Path
- import random
- from typing import Dict, List, Optional
- from fastapi import APIRouter, HTTPException
- from pydantic import BaseModel
- from cachetools import TTLCache
- from utils.win import get_proxy_settings
- from utils.mihomo import get_sub,update_config,save_yaml_dump,find_free_port,port_is_using
- from utils.config import Config, config, PROXY_POLL_DIR, Sub, Proxy
- from utils.process_mgr import process_manager
- from utils.logu import get_logger,logger
- from src.services.subscription_manager import SubscriptionManager
- sub_mgr = SubscriptionManager(config=config)
- proxy_lock = asyncio.Lock() # 全局异步锁
- router = APIRouter()
- cache = TTLCache(maxsize=100, ttl=360)
- class SysProxyResponse(BaseModel):
- sys_open: bool
- proxy_server: str
- class SubUrlPost(BaseModel):
- sub_url: str
- @router.get("/sys")
- def read_root():
- proxy_enable, proxy_server = get_proxy_settings()
- return SysProxyResponse(sys_open=proxy_enable, proxy_server=proxy_server)
- class ProxyResponse(BaseModel):
- name: str
- port: int
- reachable: bool
- file_path: str
- mgr_url: str
- process_info: Optional[Dict] = None
- ping: Optional[Dict[str, int]] = None
- async def get_proxy_response(port: int, use_cache: bool = True):
- cache_key = f"get_proxy_response_{port}"
- porxy_model = sub_mgr.sub.proxies.get(port)
- proxy_mgr = sub_mgr.list_proxies_mgr.get(port)
- mgr_url = proxy_mgr.get_management_url()
- reachable = await port_is_using(port, timeout=0.5)
- name = porxy_model.name or ''
- process_info = None
- ping = {}
- if reachable:
- process_info = proxy_mgr.get_process_info()
- response = await proxy_mgr.get_now_selected_proxy()
- reachable = response.get("err", 1) == 0
- name = response.get("name", '')
- if name:
- if porxy_model.name and porxy_model.name != name:
- porxy_model.name = name
- sub_mgr.save_config()
- else:
- name = porxy_model.name or ''
- # ping = await proxy_mgr.ping_proxies()
- # logger.info(f"{response}")
- result = ProxyResponse(
- name=name,
- port=porxy_model.port,
- reachable=reachable,
- file_path=porxy_model.file_path,
- mgr_url=mgr_url,
- process_info=process_info,
- ping=ping
- )
- return result
- async def get_all_proxy_response(use_cache: bool = True):
- global sub_mgr
- ret = []
- tasks = []
- for port,porxy_model in sub_mgr.sub.proxies.items():
- tasks.append(get_proxy_response(port, use_cache))
- ret = await asyncio.gather(*tasks)
- return ret
- async def health_check_proxy_task(interval: int = 80):
- """定时检查所有代理的健康状态"""
- while True:
- try:
- logger.info("Running health check...")
- proxies = await get_all_proxy_response(use_cache=True)
- # if not proxies:
- # logger.error("Health check failed: No proxies available")
- # else:
- # logger.info(f"Health check succeeded: {len(proxies)} proxies are healthy")
- except Exception as e:
- logger.error(f"Health check failed: {e}")
-
- # 等待指定的时间间隔
- await asyncio.sleep(interval)
- @router.get("/ping")
- async def ping_proxies() -> Dict[str, int]:
- global sub_mgr,cache
- cache_key = f"ping_result"
- if cache_key not in cache:
- try:
- result = await sub_mgr.ping_proxies()
- cache[cache_key] = result
- except Exception as e:
- logger.error(f"ping_proxies error: {e}")
- return {"err": 1, "msg": str(e)}
- else:
- logger.info(f"use cache: {cache_key}")
- return cache[cache_key]
- @router.get("/proxies/{port}")
- @router.get("/proxies")
- async def get_proxies(port: int = None):
- if port:
- proxy_mgr = sub_mgr.get_proxy_manager(port)
- if not proxy_mgr:
- raise HTTPException(status_code=404, detail=f"Proxy with port {port} not found")
- return await get_proxy_response(port)
- else:
- ret = await get_all_proxy_response()
- logger.info(f"{ret}")
- return ret
- class ProxyPost(BaseModel):
- name: Optional[str] = None
- port: Optional[int] = None
- auto: Optional[bool] = False
- class ProxyPostResponse(BaseModel):
- err: int = 1
- msg: str = ''
- data: ProxyResponse
- async def auto_select_proxy(port: int):
- global sub_mgr
- ping_res = await ping_proxies()
- name = random.choice(list(ping_res.keys()))
- # sub_mgr.list_proxies_mgr.get(port).get_management_url()
- await sub_mgr.select_proxy(port, name)
- @router.delete("/proxies/{port}")
- async def delete_proxy(port: int):
- global sub_mgr
- try:
- proxy_mgr = sub_mgr.get_proxy_manager(port)
- if not proxy_mgr:
- raise HTTPException(status_code=404, detail=f"Proxy with port {port} not found")
-
- await sub_mgr.stop_proxy(port)
- if port in sub_mgr.sub.proxies:
- del sub_mgr.sub.proxies[port]
- sub_mgr.save_config()
-
- return await get_all_proxy_response()
- except Exception as e:
- logger.error(f"Failed to delete proxy {port}: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @router.post("/proxies")
- async def create_proxy(request:ProxyPost):
- global sub_mgr,proxy_lock
- logger.info(f"request: {request}")
- proxy_mgr = None
- async with proxy_lock:
- if request.auto:
- porxy_port = await find_free_port((sub_mgr.sub.start_port, sub_mgr.sub.start_port + 10000))
- controler_port = await find_free_port((porxy_port + 1, porxy_port + 10001))
- elif request.port:
- porxy_port = request.port
- proxy_mgr = sub_mgr.get_proxy_manager(porxy_port)
- if proxy_mgr and proxy_mgr.running:
- # return {'err': 0, "msg": f"已开启,跳过 {porxy_port} ", "data": await get_proxy_response(porxy_port)}
- return ProxyPostResponse(err=0, msg=f"已开启,跳过 {porxy_port} ", data=await get_proxy_response(porxy_port))
- porxy_port_is_using = await port_is_using(porxy_port)
- controler_port = request.port + 1
- if porxy_port_is_using:
- # return ProxyPostResponse(err=1, msg=f"porxy_port={porxy_port} 端口已被占用")
- raise HTTPException(status_code=400, detail=ProxyPostResponse(err=1, msg=f"porxy_port={porxy_port} 端口已被占用"))
- if await port_is_using(controler_port):
- # return {"err": 1, "msg": f"controler_port={controler_port} 端口已被占用"}
- # return ProxyPostResponse(err=1, msg=f"controler_port={controler_port} 端口已被占用")
- raise HTTPException(status_code=400, detail=ProxyPostResponse(err=1, msg=f"controler_port={controler_port} 端口已被占用"))
- else:
- # return ProxyPostResponse(err=1, msg="port 或 auto 必须有一个")
- raise HTTPException(status_code=400, detail=ProxyPostResponse(err=1, msg="port 或 auto 必须有一个"))
- await sub_mgr.create_custom_config(porxy_port, controler_port)
- await sub_mgr.start_proxy(porxy_port)
- await auto_select_proxy(porxy_port)
- # return {"err": 0, "msg": "ok", "data": await get_proxy_response(porxy_port)}
- res = ProxyPostResponse(err=0, msg="ok", data=await get_proxy_response(porxy_port))
- logger.info(f"{res}")
- return res
- # return ProxyPostResponse(err=1, msg="proxy_lock error", data=sub_mgr.sub)
- return HTTPException(status_code=500, detail=ProxyPostResponse(err=1, msg="proxy_lock error", data=sub_mgr.sub))
- @router.post("/proxies/{port}/stop")
- async def stop_proxy(port: int):
- global sub_mgr
- proxy_mgr = sub_mgr.get_proxy_manager(port)
- if not proxy_mgr:
- raise HTTPException(status_code=404, detail=f"Proxy with port {port} not found")
- await sub_mgr.stop_proxy(port)
- return await get_proxy_response(port)
- @router.get("/subs")
- async def get_subscriptions():
- global sub_mgr
- return {"err": 0, "data": sub_mgr.sub}
- @router.post("/subs")
- async def create_subscription(request: SubUrlPost):
- try:
- global sub_mgr
- subscription = await sub_mgr.download_subscription(request.sub_url)
- return {"err": 0, "data": subscription.sub}
- except Exception as e:
- return {"err": 1, "msg": str(e)}
- def main():
- # 获取代理设置
- proxy_enable, proxy_server = get_proxy_settings()
- if proxy_enable:
- print(f"代理已启用,代理服务器地址和端口: {proxy_server}")
- else:
- print("代理未启用")
- if __name__ == "__main__":
- main()
|