import asyncio import hashlib from pathlib import Path from fastapi import APIRouter, HTTPException from datetime import datetime, timedelta from pydantic import BaseModel from typing import Dict, List, Optional import httpx import yaml from config.logu import logger, get_logger from config.settings import settings from utils.sub import async_get_sub from database.engine import engine,get_session from sqlmodel import Session, select from database.models.subscription import SubscriptionManager,SubscriptFile,MihomoMeta router = APIRouter() class SubscriptionBatchRequest(BaseModel): urls: List[str] overwrite: bool = False class SubscriptionResponse(BaseModel): file_name: str provider_name: str updated_at: datetime proxies: List[Dict] async def process_url(url: str, save_path) -> SubscriptFile: """处理单个URL的异步任务""" try: save_path_res = await async_get_sub( url, save_path, timeout=5 ) return SubscriptFile( url=url, file_path=str(save_path_res), updated_at=datetime.now(), error=0, detail={"msg": "更新订阅成功"} ) except Exception as e: logger.error(f"更新订阅失败: {url}, 错误: {str(e)}") return SubscriptFile( url=url, file_path="", updated_at=datetime.now(), error=1, detail={"msg": f"更新订阅失败: {str(e)}"} ) @router.post("/") async def add_subscriptions(sub: SubscriptionBatchRequest) -> List[SubscriptFile]: """批量更新订阅链接""" logger.info(f"开始批量更新订阅: {sub.urls}") db = SubscriptionManager() subscription_meta_list = db.get_subscription_meta() urls = [subscription_meta.url for subscription_meta in subscription_meta_list if subscription_meta.url ] logger.info(f"exist urls {urls}") # 初始化任务列表 tasks = [] # 并发处理所有URL for url in sub.urls: save_path = settings.PATH_SUBSCRIPTION_DIR / f"{hashlib.md5(url.encode()).hexdigest()[:8]}.yaml" if url in urls and not sub.overwrite: logger.info(f"{url} already exist, skip add") continue # 将每个URL的异步任务添加到任务列表中 tasks.append(process_url(url, save_path)) if not tasks: return subscription_meta_list results:List[SubscriptFile] = await asyncio.gather(*tasks) logger.info(f"批量更新订阅完成: {results}") db_results = [] for result in results: try: name,groups,proxies = db.check_valid(result) result.name = name db_results.append( db.add_subscription_meta(result, proxies, overwrite=sub.overwrite) ) except Exception as e: result.error = 1 result.detail = {"valid": str(e)} return db_results @router.get("/") async def list_subscriptions() ->List[SubscriptFile]: ret = [] db = SubscriptionManager() db_sub_models = db.get_subscription_meta() return db_sub_models @router.get("/proxies") async def list_proxies() -> Dict[str, List[MihomoMeta]]: db = SubscriptionManager() with Session(db.engine) as session: session.exec(select(MihomoMeta)).all() return db.get_proxies_by_provider()