import asyncio import hashlib from pathlib import Path from fastapi import APIRouter, HTTPException from datetime import datetime, timedelta from pydantic import BaseModel from typing import Dict, List, Optional import httpx import yaml from config.logu import logger, get_logger from config.settings import settings from utils.sub import async_get_sub from database.engine import engine,get_session from sqlmodel import Session from database.models.subscription import SubscriptionManager,SubscriptFile router = APIRouter() class SubscriptionBatchRequest(BaseModel): urls: List[str] class SubscriptionResponse(BaseModel): file_name: str provider_name: str updated_at: datetime proxies: List[Dict] async def process_url(url: str, save_path) -> SubscriptFile: """处理单个URL的异步任务""" try: save_path_res = await async_get_sub( url, save_path, timeout=5 ) return SubscriptFile( url=url, file_path=str(save_path_res), updated_at=datetime.now(), error=0, detail={"msg": "更新订阅成功"} ) except Exception as e: logger.error(f"更新订阅失败: {url}, 错误: {str(e)}") return SubscriptFile( url=url, file_path="", updated_at=datetime.now(), error=1, detail={"msg": f"更新订阅失败: {str(e)}"} ) @router.post("/") async def add_subscriptions(sub: SubscriptionBatchRequest) -> List[SubscriptFile]: """批量更新订阅链接""" logger.info(f"开始批量更新订阅: {sub.urls}") # 初始化任务列表 tasks = [] # 并发处理所有URL for url in sub.urls: save_path = settings.PATH_SUBSCRIPTION_DIR / f"{hashlib.md5(url.encode()).hexdigest()[:8]}.yaml" # 将每个URL的异步任务添加到任务列表中 tasks.append(process_url(url, save_path)) results = await asyncio.gather(*tasks) db_results = [] db = SubscriptionManager() for result in results: db_results.append( db.add_subscription_meta(result) ) return db_results @router.get("/") async def list_subscriptions() ->List[SubscriptionResponse]: ret = [] db = SubscriptionManager() db_sub_models = db.get_subscription_meta()