| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- from typing import Optional, List, Dict, Any
- from src.models.template_model import Template, TemplateType
- from src.manager.core.db_mongo import BaseMongoManager
- from jinja2 import Environment, BaseLoader
- import json
- from bson import json_util
- from utils.logu import get_logger
- import asyncio
- logger = get_logger('template')
- class TemplateManager(BaseMongoManager):
- def __init__(self):
- super().__init__()
- self.env = Environment(loader=BaseLoader())
- self.env.filters['tojson'] = lambda v: json.dumps(v, default=json_util.default)
- async def initialize(self):
- await super().initialize()
-
- async def create_template(self, name: str, template_str: str,
- template_type: TemplateType,
- description: str = None,
- collection_name: str = None) -> Template:
- """创建新模板"""
- template = Template(
- name=name,
- template_str=template_str,
- template_type=template_type,
- description=description,
- collection_name=collection_name
- )
- await template.insert()
- return template
- async def create_or_update_template(self, name: str, template_str: str,
- template_type: TemplateType,
- description: str = None,
- collection_name: str = None,
- if_exists: str = "update") -> Template:
- """
- 创建模板,如果已存在则根据if_exists参数处理
- Args:
- name: 模板名称
- template_str: 模板字符串
- template_type: 模板类型
- description: 模板描述
- if_exists: 存在时的处理方式 ('update'或'ignore',默认为'update')
- Returns:
- 创建或更新后的模板对象
- Raises:
- ValueError: 如果if_exists参数值无效
- """
- existing = await self.get_template(name)
- if existing:
- if if_exists == "ignore":
- return existing
- elif if_exists == "update":
- return await self.update_template(
- name=name,
- new_template_str=template_str,
- new_description=description,
- new_collection_name=collection_name
- )
- else:
- raise ValueError(f"Invalid if_exists value: {if_exists}. Must be 'update' or 'ignore'")
- return await self.create_template(
- name=name,
- template_str=template_str,
- template_type=template_type,
- description=description,
- collection_name=collection_name
- )
- async def get_template(self, name: str) -> Optional[Template]:
- """根据名称获取模板"""
- await self._ensure_initialized()
- return await Template.find_one(Template.name == name)
- async def update_template(self, name: str, new_template_str: str = None,
- new_description: str = None,
- new_collection_name: str = None) -> Optional[Template]:
- """更新模板"""
- template = await self.get_template(name)
- if not template:
- return None
-
- if new_template_str:
- template.template_str = new_template_str
- if new_description:
- template.description = new_description
- if new_collection_name is not None:
- template.collection_name = new_collection_name
-
- await template.update_timestamp()
- return template
- async def delete_template(self, name: str) -> bool:
- """删除模板"""
- template = await self.get_template(name)
- if template:
- await template.delete()
- return True
- return False
- async def list_templates(self, template_type: TemplateType = None) -> List[Template]:
- """列出所有模板,可过滤类型"""
- await self._ensure_initialized()
- if template_type:
- return await Template.find(Template.template_type == template_type).to_list()
- return await Template.all().to_list()
- def render_template(self, template_str: str, context: Dict[str, Any]):
- """渲染模板字符串"""
- template = self.env.from_string(template_str)
- rendered = template.render(**context)
-
- try:
- return json.loads(rendered, object_hook=json_util.object_hook)
- except json.JSONDecodeError as e:
- logger.error(f"Template rendering failed: {e}\nRendered content:\n{rendered}")
- raise ValueError(f"Invalid JSON after rendering: {e}")
- async def execute_template(self, name: str, context: Dict[str, Any],
- collection_name: str = None):
- """执行模板查询"""
- template = await self.get_template(name)
- if not template:
- raise ValueError(f"Template '{name}' not found")
-
- pipeline = self.render_template(template.template_str, context)
-
- if not collection_name and template.collection_name:
- collection_name = template.collection_name
- elif not collection_name and template.template_type == TemplateType.AGGREGATION:
- collection_name = "Product" # 默认集合
-
- if not collection_name:
- raise ValueError("Collection name is required for this template type")
-
- return await self.db[collection_name].aggregate(pipeline).to_list()
- class TemplateService:
- """供外部程序使用的服务类"""
- _instance = None
- _lock = asyncio.Lock()
-
- @classmethod
- async def get_instance(cls):
- if cls._instance is None:
- async with cls._lock:
- if cls._instance is None:
- cls._instance = TemplateManager()
- await cls._instance.initialize()
- return cls._instance
|