from typing import Optional, List, Dict, Any from src.models.template_model import Template, TemplateType from src.manager.core.db_mongo import BaseMongoManager from jinja2 import Environment, BaseLoader import json from bson import json_util from utils.logu import get_logger import asyncio logger = get_logger('template') class TemplateManager(BaseMongoManager): def __init__(self): super().__init__() self.env = Environment(loader=BaseLoader()) self.env.filters['tojson'] = lambda v: json.dumps(v, default=json_util.default) async def initialize(self): await super().initialize() async def create_template(self, name: str, template_str: str, template_type: TemplateType, description: str = None, collection_name: str = None) -> Template: """创建新模板""" template = Template( name=name, template_str=template_str, template_type=template_type, description=description, collection_name=collection_name ) await template.insert() return template async def create_or_update_template(self, name: str, template_str: str, template_type: TemplateType, description: str = None, collection_name: str = None, if_exists: str = "update") -> Template: """ 创建模板,如果已存在则根据if_exists参数处理 Args: name: 模板名称 template_str: 模板字符串 template_type: 模板类型 description: 模板描述 if_exists: 存在时的处理方式 ('update'或'ignore',默认为'update') Returns: 创建或更新后的模板对象 Raises: ValueError: 如果if_exists参数值无效 """ existing = await self.get_template(name) if existing: if if_exists == "ignore": return existing elif if_exists == "update": return await self.update_template( name=name, new_template_str=template_str, new_description=description, new_collection_name=collection_name ) else: raise ValueError(f"Invalid if_exists value: {if_exists}. Must be 'update' or 'ignore'") return await self.create_template( name=name, template_str=template_str, template_type=template_type, description=description, collection_name=collection_name ) async def get_template(self, name: str) -> Optional[Template]: """根据名称获取模板""" await self._ensure_initialized() return await Template.find_one(Template.name == name) async def update_template(self, name: str, new_template_str: str = None, new_description: str = None, new_collection_name: str = None) -> Optional[Template]: """更新模板""" template = await self.get_template(name) if not template: return None if new_template_str: template.template_str = new_template_str if new_description: template.description = new_description if new_collection_name is not None: template.collection_name = new_collection_name await template.update_timestamp() return template async def delete_template(self, name: str) -> bool: """删除模板""" template = await self.get_template(name) if template: await template.delete() return True return False async def list_templates(self, template_type: TemplateType = None) -> List[Template]: """列出所有模板,可过滤类型""" await self._ensure_initialized() if template_type: return await Template.find(Template.template_type == template_type).to_list() return await Template.all().to_list() def render_template(self, template_str: str, context: Dict[str, Any]): """渲染模板字符串""" template = self.env.from_string(template_str) rendered = template.render(**context) try: return json.loads(rendered, object_hook=json_util.object_hook) except json.JSONDecodeError as e: logger.error(f"Template rendering failed: {e}\nRendered content:\n{rendered}") raise ValueError(f"Invalid JSON after rendering: {e}") async def execute_template(self, name: str, context: Dict[str, Any], collection_name: str = None): """执行模板查询""" template = await self.get_template(name) if not template: raise ValueError(f"Template '{name}' not found") pipeline = self.render_template(template.template_str, context) if not collection_name and template.collection_name: collection_name = template.collection_name elif not collection_name and template.template_type == TemplateType.AGGREGATION: collection_name = "Product" # 默认集合 if not collection_name: raise ValueError("Collection name is required for this template type") return await self.db[collection_name].aggregate(pipeline).to_list() class TemplateService: """供外部程序使用的服务类""" _instance = None _lock = asyncio.Lock() @classmethod async def get_instance(cls): if cls._instance is None: async with cls._lock: if cls._instance is None: cls._instance = TemplateManager() await cls._instance.initialize() return cls._instance