subscriptions.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import asyncio
  2. import hashlib
  3. from pathlib import Path
  4. from fastapi import APIRouter, HTTPException
  5. from datetime import datetime, timedelta
  6. from pydantic import BaseModel
  7. from typing import Dict, List, Optional
  8. import httpx
  9. import yaml
  10. from config.logu import logger, get_logger
  11. from config.settings import settings
  12. from utils.sub import async_get_sub
  13. from database.engine import engine,get_session
  14. from sqlmodel import Session
  15. from database.models.subscription import SubscriptionManager,SubscriptFile
  16. router = APIRouter()
  17. class SubscriptionBatchRequest(BaseModel):
  18. urls: List[str]
  19. class SubscriptionResponse(BaseModel):
  20. file_name: str
  21. provider_name: str
  22. updated_at: datetime
  23. proxies: List[Dict]
  24. async def process_url(url: str, save_path) -> SubscriptFile:
  25. """处理单个URL的异步任务"""
  26. try:
  27. save_path_res = await async_get_sub(
  28. url,
  29. save_path,
  30. timeout=5
  31. )
  32. return SubscriptFile(
  33. url=url,
  34. file_path=str(save_path_res),
  35. updated_at=datetime.now(),
  36. error=0,
  37. detail={"msg": "更新订阅成功"}
  38. )
  39. except Exception as e:
  40. logger.error(f"更新订阅失败: {url}, 错误: {str(e)}")
  41. return SubscriptFile(
  42. url=url,
  43. file_path="",
  44. updated_at=datetime.now(),
  45. error=1,
  46. detail={"msg": f"更新订阅失败: {str(e)}"}
  47. )
  48. @router.post("/")
  49. async def add_subscriptions(sub: SubscriptionBatchRequest) -> List[SubscriptFile]:
  50. """批量更新订阅链接"""
  51. logger.info(f"开始批量更新订阅: {sub.urls}")
  52. # 初始化任务列表
  53. tasks = []
  54. # 并发处理所有URL
  55. for url in sub.urls:
  56. save_path = settings.PATH_SUBSCRIPTION_DIR / f"{hashlib.md5(url.encode()).hexdigest()[:8]}.yaml"
  57. # 将每个URL的异步任务添加到任务列表中
  58. tasks.append(process_url(url, save_path))
  59. results = await asyncio.gather(*tasks)
  60. db_results = []
  61. db = SubscriptionManager()
  62. for result in results:
  63. db_results.append(
  64. db.add_subscription_meta(result)
  65. )
  66. return db_results
  67. @router.get("/")
  68. async def list_subscriptions() ->List[SubscriptionResponse]:
  69. ret = []
  70. db = SubscriptionManager()
  71. db_sub_models = db.get_subscription_meta()