template_manager.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. from typing import Optional, List, Dict, Any
  2. from src.models.template_model import Template, TemplateType
  3. from src.manager.core.db_mongo import BaseMongoManager
  4. from jinja2 import Environment, BaseLoader
  5. import json
  6. from bson import json_util
  7. from utils.logu import get_logger
  8. import asyncio
  9. logger = get_logger('template')
  10. class TemplateManager(BaseMongoManager):
  11. def __init__(self):
  12. super().__init__()
  13. self.env = Environment(loader=BaseLoader())
  14. self.env.filters['tojson'] = lambda v: json.dumps(v, default=json_util.default)
  15. async def initialize(self):
  16. await super().initialize()
  17. async def create_template(self, name: str, template_str: str,
  18. template_type: TemplateType,
  19. description: str = None,
  20. collection_name: str = None) -> Template:
  21. """创建新模板"""
  22. template = Template(
  23. name=name,
  24. template_str=template_str,
  25. template_type=template_type,
  26. description=description,
  27. collection_name=collection_name
  28. )
  29. await template.insert()
  30. return template
  31. async def create_or_update_template(self, name: str, template_str: str,
  32. template_type: TemplateType,
  33. description: str = None,
  34. collection_name: str = None,
  35. if_exists: str = "update") -> Template:
  36. """
  37. 创建模板,如果已存在则根据if_exists参数处理
  38. Args:
  39. name: 模板名称
  40. template_str: 模板字符串
  41. template_type: 模板类型
  42. description: 模板描述
  43. if_exists: 存在时的处理方式 ('update'或'ignore',默认为'update')
  44. Returns:
  45. 创建或更新后的模板对象
  46. Raises:
  47. ValueError: 如果if_exists参数值无效
  48. """
  49. existing = await self.get_template(name)
  50. if existing:
  51. if if_exists == "ignore":
  52. return existing
  53. elif if_exists == "update":
  54. return await self.update_template(
  55. name=name,
  56. new_template_str=template_str,
  57. new_description=description,
  58. new_collection_name=collection_name
  59. )
  60. else:
  61. raise ValueError(f"Invalid if_exists value: {if_exists}. Must be 'update' or 'ignore'")
  62. return await self.create_template(
  63. name=name,
  64. template_str=template_str,
  65. template_type=template_type,
  66. description=description,
  67. collection_name=collection_name
  68. )
  69. async def get_template(self, name: str) -> Optional[Template]:
  70. """根据名称获取模板"""
  71. await self._ensure_initialized()
  72. return await Template.find_one(Template.name == name)
  73. async def update_template(self, name: str, new_template_str: str = None,
  74. new_description: str = None,
  75. new_collection_name: str = None) -> Optional[Template]:
  76. """更新模板"""
  77. template = await self.get_template(name)
  78. if not template:
  79. return None
  80. if new_template_str:
  81. template.template_str = new_template_str
  82. if new_description:
  83. template.description = new_description
  84. if new_collection_name is not None:
  85. template.collection_name = new_collection_name
  86. await template.update_timestamp()
  87. return template
  88. async def delete_template(self, name: str) -> bool:
  89. """删除模板"""
  90. template = await self.get_template(name)
  91. if template:
  92. await template.delete()
  93. return True
  94. return False
  95. async def list_templates(self, template_type: TemplateType = None) -> List[Template]:
  96. """列出所有模板,可过滤类型"""
  97. await self._ensure_initialized()
  98. if template_type:
  99. return await Template.find(Template.template_type == template_type).to_list()
  100. return await Template.all().to_list()
  101. def render_template(self, template_str: str, context: Dict[str, Any]):
  102. """渲染模板字符串"""
  103. template = self.env.from_string(template_str)
  104. rendered = template.render(**context)
  105. try:
  106. return json.loads(rendered, object_hook=json_util.object_hook)
  107. except json.JSONDecodeError as e:
  108. logger.error(f"Template rendering failed: {e}\nRendered content:\n{rendered}")
  109. raise ValueError(f"Invalid JSON after rendering: {e}")
  110. async def execute_template(self, name: str, context: Dict[str, Any],
  111. collection_name: str = None):
  112. """执行模板查询"""
  113. template = await self.get_template(name)
  114. if not template:
  115. raise ValueError(f"Template '{name}' not found")
  116. pipeline = self.render_template(template.template_str, context)
  117. if not collection_name and template.collection_name:
  118. collection_name = template.collection_name
  119. elif not collection_name and template.template_type == TemplateType.AGGREGATION:
  120. collection_name = "Product" # 默认集合
  121. if not collection_name:
  122. raise ValueError("Collection name is required for this template type")
  123. return await self.db[collection_name].aggregate(pipeline).to_list()
  124. class TemplateService:
  125. """供外部程序使用的服务类"""
  126. _instance = None
  127. _lock = asyncio.Lock()
  128. @classmethod
  129. async def get_instance(cls):
  130. if cls._instance is None:
  131. async with cls._lock:
  132. if cls._instance is None:
  133. cls._instance = TemplateManager()
  134. await cls._instance.initialize()
  135. return cls._instance