| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- from datetime import datetime
- from typing import Dict, List, Optional
- from sqlmodel import SQLModel, Field,Session,select,Relationship,func,update
- from sqlalchemy.engine import Engine
- from sqlalchemy.dialects.postgresql import JSON
- from pydantic import BaseModel
- import yaml
- from database.engine import get_session,create_engine,engine
- from config.logu import logger
- class SubscriptFile(SQLModel, table=True):
- id: Optional[int] = Field(default=None, primary_key=True)
- name: str = Field()
- url: str = Field(index=True)
- file_path: str = Field()
- updated_at: datetime = Field(default_factory=datetime.now)
- error: int = Field(default=0)
- detail: dict = Field(default={}, sa_type=JSON)
- mihomo_meta: List["MihomoMeta"] = Relationship(back_populates="subscript_file")
- class MihomoMeta(SQLModel, table=True):
- id: Optional[int] = Field(default=None, primary_key=True)
- provider_name: str = Field(default="")
- proxy_name: str = Field(default="")
- mixed_port: Optional[int] = Field(default=None)
- external_controller: Optional[str] = Field(default=None)
- temp_file_path: Optional[str] = Field(default=None)
- pid: Optional[int] = Field(default=None)
- running: Optional[bool] = False
- updated_at: datetime = Field(default_factory=datetime.now)
- delay: Optional[int] = Field(default=None) # 新增延迟字段
- subscript_file_id: Optional[int] = Field(default=None, foreign_key="subscriptfile.id")
- subscript_file: SubscriptFile | None = Relationship(back_populates="mihomo_meta")
-
- class SubscriptionManager:
- def __init__(self, db:Engine=None):
- self.engine:Engine = db or engine
- def add_subscription_meta(self, sub_model: SubscriptFile, proxies, overwrite:bool=False):
- with Session(self.engine) as session:
- exist_sub = session.exec(select(SubscriptFile).where(SubscriptFile.url == sub_model.url)).first()
- if exist_sub and not overwrite:
- logger.info(f"{sub_model.url} already exist, skip add")
- return exist_sub
- logger.info(f"exist_sub {exist_sub} overwrite {overwrite} proxies {len(proxies)}")
- if overwrite:
- # 删除与 exist_sub 相关的 MihomoMeta 记录
- session.exec(select(MihomoMeta).where(MihomoMeta.subscript_file_id == exist_sub.id)).all()
- for proxy in session.exec(select(MihomoMeta).where(MihomoMeta.subscript_file_id == exist_sub.id)).all():
- session.delete(proxy)
-
- # 删除旧的 SubscriptFile 记录
- session.delete(exist_sub)
- session.commit()
- logger.info(f"add {sub_model}")
- # 添加新的 SubscriptFile
- session.add(sub_model)
- session.commit()
- session.refresh(sub_model)
- # 添加 MihomoMeta 记录
- for proxy in proxies:
- miho = MihomoMeta(
- provider_name=sub_model.name,
- proxy_name=proxy["name"],
- subscript_file_id=sub_model.id # 使用 sub_model.id
- )
- logger.info(f"miho {miho}")
- session.add(miho)
-
- session.commit()
- session.refresh(sub_model)
- return sub_model
-
- def get_subscription_meta(self) -> List[SubscriptFile]:
- with Session(self.engine) as session:
- return session.exec(select(SubscriptFile)).all()
-
- def get_proxies(self) -> List[MihomoMeta]:
- with Session(self.engine) as session:
- return session.exec(select(MihomoMeta)).all()
-
- def get_running_proxies(self) -> List[MihomoMeta]:
- with Session(self.engine) as session:
- all = session.exec(
- select(MihomoMeta)
- .where(MihomoMeta.pid.is_not(None))
- ).all()
- return all
-
- def update_proxy_delays(self, provider_name: str, delays: dict):
- """更新指定服务商下所有代理的延迟"""
- with Session(self.engine) as session:
- # 先清空该provider下所有代理的延迟值
- session.exec(
- update(MihomoMeta)
- .where(MihomoMeta.provider_name == provider_name)
- .values(delay=None)
- )
-
- # 更新有延迟值的代理
- for proxy_name, delay in delays.items():
- if isinstance(delay, int): # 确保只更新有效的延迟值
- session.exec(
- update(MihomoMeta)
- .where(
- (MihomoMeta.provider_name == provider_name) &
- (MihomoMeta.proxy_name == proxy_name)
- )
- .values(delay=delay)
- )
-
- session.commit()
- def get_each_provider_proxies(self):
- with Session(self.engine) as session:
- # 子查询:获取每个 provider_name 的最小 id
- subquery = (
- select(
- MihomoMeta.provider_name,
- func.min(MihomoMeta.id).label("min_id")
- )
- .group_by(MihomoMeta.provider_name)
- .subquery()
- )
- # 主查询:通过联接到子查询获取每个 provider_name 的第一条记录
- stmt = (
- select(MihomoMeta)
- .join(subquery, MihomoMeta.id == subquery.c.min_id)
- )
- # 执行查询并获取结果
- return session.exec(stmt).all()
- def get_each_provider_running_proxies(self) -> List[MihomoMeta]:
- with Session(self.engine) as session:
- subquery = (
- select(
- MihomoMeta.provider_name,
- func.min(MihomoMeta.id).label("min_id")
- )
- .where(MihomoMeta.pid.is_not(None))
- .group_by(MihomoMeta.provider_name)
- .subquery()
- )
- # 主查询:通过联接到子查询获取每个 provider_name 的第一条记录
- stmt = (
- select(MihomoMeta)
- .join(subquery, MihomoMeta.id == subquery.c.min_id)
- )
- # 执行查询并获取结果
- return session.exec(stmt).all()
- def get_proxies_by_provider(self) -> Dict[str, List[MihomoMeta]]:
- """
- 返回一个字典,键是 provider_name,值是该 provider_name 对应的所有 MihomoMeta 记录列表。
- """
- with Session(self.engine) as session:
- all_proxies = session.exec(select(MihomoMeta)).all()
-
- # 使用字典来组织数据
- proxies_by_provider = {}
- for proxy in all_proxies:
- if proxy.provider_name not in proxies_by_provider:
- proxies_by_provider[proxy.provider_name] = []
- proxies_by_provider[proxy.provider_name].append(proxy)
-
- return proxies_by_provider
- def check_valid(self, sub_model:SubscriptFile):
- with open(sub_model.file_path, "r",encoding='utf-8') as f:
- sub_yaml = yaml.safe_load(f)
- groups = sub_yaml.get("proxy-groups", [])
- if not groups:
- raise ValueError("subscription file is not valid")
- name = groups[0].get("name", "")
- if not name:
- raise ValueError("subscription file is not valid")
- proxies = sub_yaml.get("proxies", [])
- if not proxies:
- raise ValueError("subscription file is not valid")
- fileter_proxies = []
- fileter_proxies = []
- keywords = ['流量', '套餐', '剩余', '测试']
- for proxy in proxies:
- if not any(keyword in proxy.get("name", "") for keyword in keywords):
- fileter_proxies.append(proxy)
- return name, groups,proxies
|