from jinja2 import Environment, BaseLoader import json from bson import json_util from src.manager.core.db_mongo import BaseMongoManager import asyncio from utils.logu import get_logger from src.models.field_config import FieldConfig, get_field_descriptions logger = get_logger('test') class MongoAggregationTemplate: def __init__(self): self.env = Environment(loader=BaseLoader()) # 添加自定义过滤器 self.env.filters['tojson'] = lambda v: json.dumps(v, default=json_util.default) def to_template_string(self, pipeline): """将聚合管道转换为模板字符串""" return json.dumps(pipeline, default=json_util.default) def render(self, template_str, context): """渲染模板字符串为可执行的聚合管道""" template = self.env.from_string(template_str) rendered = template.render(**context) try: return json.loads(rendered, object_hook=json_util.object_hook) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON after rendering: {e}\nRendered content:\n{rendered}") async def filter_aggregate_demo(product_name="电线保护套"): # 初始化模板处理器 template_processor = MongoAggregationTemplate() # 定义聚合管道模板 filter_competior_by_name = [ { '$match': { 'basic_info.name': "{{ product_name }}" } }, { '$project': { 'competitor_crawl_data': 1 } }, { '$addFields': { 'competitors': { '$objectToArray': '$competitor_crawl_data' } } }, { '$unwind': '$competitors' }, { '$project': { '_id': 0, 'asin': '$competitors.k', 'product_info': { 'main_text': '$competitors.v.extra_result.product_info.main_text' }, 'result_table': { '$map': { 'input': '$competitors.v.extra_result.result_table', 'as': 'item', 'in': { 'traffic_keyword': '$$item.traffic_keyword', 'monthly_searches': '$$item.monthly_searches' } } } } } ] # 将聚合管道转换为模板字符串 template_str = template_processor.to_template_string(filter_competior_by_name) logger.info(f"Template string: {template_str}") # 初始化数据库 db_manager = BaseMongoManager(db_name='test') await db_manager.initialize() db = db_manager.db # 渲染模板 try: pipeline = template_processor.render(template_str, { "product_name": product_name, # 可以添加更多变量 }) logger.info(f"Rendered pipeline: {json.dumps(pipeline, indent=2, ensure_ascii=False)}") except ValueError as e: logger.error(f"Template rendering failed: {e}") return # 执行聚合查询 result = await db.Product.aggregate(pipeline).to_list() logger.info(json.dumps(result, ensure_ascii=False)) # 测试代码 if __name__ == "__main__": asyncio.run(filter_aggregate_demo())