subscriptions.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import asyncio
  2. import hashlib
  3. from pathlib import Path
  4. from fastapi import APIRouter, HTTPException
  5. from datetime import datetime, timedelta
  6. from pydantic import BaseModel
  7. from typing import Dict, List, Optional
  8. import httpx
  9. import yaml
  10. from config.logu import logger, get_logger
  11. from config.settings import settings
  12. from utils.sub import async_get_sub
  13. from database.engine import engine,get_session
  14. from sqlmodel import Session, select
  15. from database.models.subscription import SubscriptionManager,SubscriptFile,MihomoMeta
  16. router = APIRouter()
  17. class SubscriptionBatchRequest(BaseModel):
  18. urls: List[str]
  19. overwrite: bool = False
  20. class SubscriptionResponse(BaseModel):
  21. file_name: str
  22. provider_name: str
  23. updated_at: datetime
  24. proxies: List[Dict]
  25. async def process_url(url: str, save_path) -> SubscriptFile:
  26. """处理单个URL的异步任务"""
  27. try:
  28. save_path_res = await async_get_sub(
  29. url,
  30. save_path,
  31. timeout=5
  32. )
  33. return SubscriptFile(
  34. url=url,
  35. file_path=str(save_path_res),
  36. updated_at=datetime.now(),
  37. error=0,
  38. detail={"msg": "更新订阅成功"}
  39. )
  40. except Exception as e:
  41. logger.error(f"更新订阅失败: {url}, 错误: {str(e)}")
  42. return SubscriptFile(
  43. url=url,
  44. file_path="",
  45. updated_at=datetime.now(),
  46. error=1,
  47. detail={"msg": f"更新订阅失败: {str(e)}"}
  48. )
  49. @router.post("/")
  50. async def add_subscriptions(sub: SubscriptionBatchRequest) -> List[SubscriptFile]:
  51. """批量更新订阅链接"""
  52. logger.info(f"开始批量更新订阅: {sub.urls}")
  53. db = SubscriptionManager()
  54. subscription_meta_list = db.get_subscription_meta()
  55. urls = [subscription_meta.url for subscription_meta in subscription_meta_list if subscription_meta.url ]
  56. logger.info(f"exist urls {urls}")
  57. # 初始化任务列表
  58. tasks = []
  59. # 并发处理所有URL
  60. for url in sub.urls:
  61. save_path = settings.PATH_SUBSCRIPTION_DIR / f"{hashlib.md5(url.encode()).hexdigest()[:8]}.yaml"
  62. if url in urls and not sub.overwrite:
  63. logger.info(f"{url} already exist, skip add")
  64. continue
  65. # 将每个URL的异步任务添加到任务列表中
  66. tasks.append(process_url(url, save_path))
  67. if not tasks:
  68. return subscription_meta_list
  69. results:List[SubscriptFile] = await asyncio.gather(*tasks)
  70. logger.info(f"批量更新订阅完成: {results}")
  71. db_results = []
  72. for result in results:
  73. try:
  74. name,groups,proxies = db.check_valid(result)
  75. result.name = name
  76. db_results.append(
  77. db.add_subscription_meta(result, proxies, overwrite=sub.overwrite)
  78. )
  79. except Exception as e:
  80. result.error = 1
  81. result.detail = {"valid": str(e)}
  82. return db_results
  83. @router.get("/")
  84. async def list_subscriptions() ->List[SubscriptFile]:
  85. ret = []
  86. db = SubscriptionManager()
  87. db_sub_models = db.get_subscription_meta()
  88. return db_sub_models
  89. @router.get("/proxies")
  90. async def list_proxies() -> Dict[str, List[MihomoMeta]]:
  91. db = SubscriptionManager()
  92. with Session(db.engine) as session:
  93. session.exec(select(MihomoMeta)).all()
  94. return db.get_proxies_by_provider()