from datetime import datetime from typing import List, Optional from sqlmodel import SQLModel, Field,Session,select,Relationship from sqlalchemy.engine import Engine from sqlalchemy.dialects.postgresql import JSON from pydantic import BaseModel import yaml from database.engine import get_session,create_engine,engine class SubscriptFile(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) url: str = Field(index=True) file_path: str = Field() updated_at: datetime = Field(default_factory=datetime.now) error: int = Field(default=0) detail: dict = Field(default={}, sa_type=JSON) mihomo_meta: List["MihomoMeta"] = Relationship(back_populates="subscript_file") class MihomoMeta(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) provider_name: str proxy_name: str mixed_port: Optional[int] external_controller: Optional[str] temp_file_path: Optional[str] pid: Optional[int] running: Optional[bool] = False updated_at: datetime = Field(default_factory=datetime.now) subscript_file_id: Optional[int] = Field(default=None, foreign_key="subscriptfile.id") subscript_file: SubscriptFile | None = Relationship(back_populates="mihomo_meta") class SubscriptionManager: def __init__(self, db:Engine=None): self.engine:Engine = db or engine def add_subscription_meta(self, sub_model: SubscriptFile, overwrite:bool=False): with Session(self.engine) as session: exist_sub = session.exec(select(SubscriptFile).where(SubscriptFile.url == sub_model.url)).first() if exist_sub and not overwrite: return exist_sub name, groups = self._check_valid(sub_model) # 如果存在且需要覆盖,则删除旧的 MihomoMeta 记录 if exist_sub and overwrite: session.exec(select(MihomoMeta).where(MihomoMeta.subscript_file_id == exist_sub.id)).delete() session.delete(exist_sub) session.commit() # 添加新的 SubscriptFile session.add(sub_model) session.commit() session.refresh(sub_model) # 添加 MihomoMeta 记录 for group in groups: miho = MihomoMeta( provider_name=name, proxy_name=group["name"], subscript_file_id=sub_model.id # 使用 sub_model.id ) session.add(miho) session.commit() session.refresh(sub_model) return sub_model def get_subscription_meta(self) -> List[SubscriptFile]: with Session(self.engine) as session: return session.exec(select(SubscriptFile)).all() def _check_valid(self, sub_model:SubscriptFile): with open(sub_model.file_path, "r",encoding='utf-8') as f: sub_yaml = yaml.safe_load(f) groups = sub_yaml.get("proxy-groups", []) if not groups: raise ValueError("subscription file is not valid") name = groups[0].get("name", "") if not name: raise ValueError("subscription file is not valid") return name, groups # ret.append( # SubscriptionResponse( # file_name=Path(sub.file_path).name, # provider_name=name, # updated_at=sub.updated_at, # proxies=sub_yaml.get("proxies", []), # ) # ) # return ret