from datetime import datetime from typing import Dict, List, Optional from sqlmodel import SQLModel, Field,Session,select,Relationship from sqlalchemy.engine import Engine from sqlalchemy.dialects.postgresql import JSON from pydantic import BaseModel import yaml from database.engine import get_session,create_engine,engine from config.logu import logger class SubscriptFile(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) name: str = Field() url: str = Field(index=True) file_path: str = Field() updated_at: datetime = Field(default_factory=datetime.now) error: int = Field(default=0) detail: dict = Field(default={}, sa_type=JSON) mihomo_meta: List["MihomoMeta"] = Relationship(back_populates="subscript_file") class MihomoMeta(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) provider_name: str proxy_name: str mixed_port: Optional[int] external_controller: Optional[str] temp_file_path: Optional[str] pid: Optional[int] running: Optional[bool] = False updated_at: datetime = Field(default_factory=datetime.now) subscript_file_id: Optional[int] = Field(default=None, foreign_key="subscriptfile.id") subscript_file: SubscriptFile | None = Relationship(back_populates="mihomo_meta") class SubscriptionManager: def __init__(self, db:Engine=None): self.engine:Engine = db or engine def add_subscription_meta(self, sub_model: SubscriptFile, proxies, overwrite:bool=False): with Session(self.engine) as session: exist_sub = session.exec(select(SubscriptFile).where(SubscriptFile.url == sub_model.url)).first() if exist_sub and not overwrite: logger.info(f"{sub_model.url} already exist, skip add") return exist_sub logger.info(f"exist_sub {exist_sub} overwrite {overwrite} proxies {proxies}") if exist_sub and overwrite: # 删除与 exist_sub 相关的 MihomoMeta 记录 session.exec(select(MihomoMeta).where(MihomoMeta.subscript_file_id == exist_sub.id)).all() for proxy in session.exec(select(MihomoMeta).where(MihomoMeta.subscript_file_id == exist_sub.id)).all(): session.delete(proxy) # 删除旧的 SubscriptFile 记录 session.delete(exist_sub) session.commit() # 添加新的 SubscriptFile session.add(sub_model) session.commit() session.refresh(sub_model) # 添加 MihomoMeta 记录 for proxy in proxies: miho = MihomoMeta( provider_name=sub_model.name, proxy_name=proxy["name"], subscript_file_id=sub_model.id # 使用 sub_model.id ) logger.info(f"miho {miho}") session.add(miho) session.commit() session.refresh(sub_model) return sub_model def get_subscription_meta(self) -> List[SubscriptFile]: with Session(self.engine) as session: return session.exec(select(SubscriptFile)).all() def get_proxies(self) -> List[MihomoMeta]: with Session(self.engine) as session: return session.exec(select(MihomoMeta)).all() def get_proxies_by_provider(self) -> Dict[str, List[MihomoMeta]]: """ 返回一个字典,键是 provider_name,值是该 provider_name 对应的所有 MihomoMeta 记录列表。 """ with Session(self.engine) as session: all_proxies = session.exec(select(MihomoMeta)).all() # 使用字典来组织数据 proxies_by_provider = {} for proxy in all_proxies: if proxy.provider_name not in proxies_by_provider: proxies_by_provider[proxy.provider_name] = [] proxies_by_provider[proxy.provider_name].append(proxy) return proxies_by_provider def check_valid(self, sub_model:SubscriptFile): with open(sub_model.file_path, "r",encoding='utf-8') as f: sub_yaml = yaml.safe_load(f) groups = sub_yaml.get("proxy-groups", []) if not groups: raise ValueError("subscription file is not valid") name = groups[0].get("name", "") if not name: raise ValueError("subscription file is not valid") proxies = sub_yaml.get("proxies", []) if not proxies: raise ValueError("subscription file is not valid") fileter_proxies = [] fileter_proxies = [] keywords = ['流量', '套餐', '剩余', '测试'] for proxy in proxies: if not any(keyword in proxy.get("name", "") for keyword in keywords): fileter_proxies.append(proxy) return name, groups,proxies # ret.append( # SubscriptionResponse( # file_name=Path(sub.file_path).name, # provider_name=name, # updated_at=sub.updated_at, # proxies=sub_yaml.get("proxies", []), # ) # ) # return ret