| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import asyncio
- import hashlib
- from pathlib import Path
- from fastapi import APIRouter, HTTPException
- from datetime import datetime, timedelta
- from pydantic import BaseModel
- from typing import Dict, List, Optional
- import httpx
- import yaml
- from config.logu import logger, get_logger
- from config.settings import settings
- from utils.sub import async_get_sub
- from database.engine import engine,get_session
- from sqlmodel import Session, select
- from database.models.subscription import SubscriptionManager,SubscriptFile,MihomoMeta
- router = APIRouter()
- class SubscriptionBatchRequest(BaseModel):
- urls: List[str]
- overwrite: bool = False
- class SubscriptionResponse(BaseModel):
- file_name: str
- provider_name: str
- updated_at: datetime
- proxies: List[Dict]
- async def process_url(url: str, save_path) -> SubscriptFile:
- """处理单个URL的异步任务"""
- try:
- save_path_res = await async_get_sub(
- url,
- save_path,
- timeout=5
- )
- return SubscriptFile(
- url=url,
- file_path=str(save_path_res),
- updated_at=datetime.now(),
- error=0,
- detail={"msg": "更新订阅成功"}
- )
- except Exception as e:
- logger.error(f"更新订阅失败: {url}, 错误: {str(e)}")
- return SubscriptFile(
- url=url,
- file_path="",
- updated_at=datetime.now(),
- error=1,
- detail={"msg": f"更新订阅失败: {str(e)}"}
- )
- @router.post("/")
- async def add_subscriptions(sub: SubscriptionBatchRequest) -> List[SubscriptFile]:
- """批量更新订阅链接"""
- logger.info(f"开始批量更新订阅: {sub.urls}")
- db = SubscriptionManager()
- subscription_meta_list = db.get_subscription_meta()
- urls = [subscription_meta.url for subscription_meta in subscription_meta_list if subscription_meta.url ]
- logger.info(f"exist urls {urls}")
- # 初始化任务列表
- tasks = []
- # 并发处理所有URL
- for url in sub.urls:
- save_path = settings.PATH_SUBSCRIPTION_DIR / f"{hashlib.md5(url.encode()).hexdigest()[:8]}.yaml"
- if url in urls and not sub.overwrite:
- logger.info(f"{url} already exist, skip add")
- continue
- # 将每个URL的异步任务添加到任务列表中
- tasks.append(process_url(url, save_path))
- if not tasks:
- return subscription_meta_list
-
- results:List[SubscriptFile] = await asyncio.gather(*tasks)
- logger.info(f"批量更新订阅完成: {results}")
- db_results = []
- for result in results:
- try:
- name,groups,proxies = db.check_valid(result)
- result.name = name
- db_results.append(
- db.add_subscription_meta(result, proxies, overwrite=sub.overwrite)
- )
- except Exception as e:
- result.error = 1
- result.detail = {"valid": str(e)}
- return db_results
- @router.get("/")
- async def list_subscriptions() ->List[SubscriptFile]:
- ret = []
- db = SubscriptionManager()
- db_sub_models = db.get_subscription_meta()
- return db_sub_models
- @router.get("/proxies")
- async def list_proxies() -> Dict[str, List[MihomoMeta]]:
- db = SubscriptionManager()
- with Session(db.engine) as session:
- session.exec(select(MihomoMeta)).all()
- return db.get_proxies_by_provider()
|