| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- from jinja2 import Environment, BaseLoader
- import json
- from bson import json_util
- from src.manager.core.db_mongo import BaseMongoManager
- import asyncio
- from utils.logu import get_logger
- from src.models.field_config import FieldConfig, get_field_descriptions
- logger = get_logger('test')
- class MongoAggregationTemplate:
- def __init__(self):
- self.env = Environment(loader=BaseLoader())
- # 添加自定义过滤器
- self.env.filters['tojson'] = lambda v: json.dumps(v, default=json_util.default)
-
- def to_template_string(self, pipeline):
- """将聚合管道转换为模板字符串"""
- return json.dumps(pipeline, default=json_util.default)
-
- def render(self, template_str, context):
- """渲染模板字符串为可执行的聚合管道"""
- template = self.env.from_string(template_str)
- rendered = template.render(**context)
-
- try:
- return json.loads(rendered, object_hook=json_util.object_hook)
- except json.JSONDecodeError as e:
- raise ValueError(f"Invalid JSON after rendering: {e}\nRendered content:\n{rendered}")
- async def filter_aggregate_demo(product_name="电线保护套"):
- # 初始化模板处理器
- template_processor = MongoAggregationTemplate()
-
- # 定义聚合管道模板
- filter_competior_by_name = [
- {
- '$match': {
- 'basic_info.name': "{{ product_name }}"
- }
- },
- {
- '$project': {
- 'competitor_crawl_data': 1
- }
- }, {
- '$addFields': {
- 'competitors': {
- '$objectToArray': '$competitor_crawl_data'
- }
- }
- }, {
- '$unwind': '$competitors'
- }, {
- '$project': {
- '_id': 0,
- 'asin': '$competitors.k',
- 'product_info': {
- 'main_text': '$competitors.v.extra_result.product_info.main_text'
- },
- 'result_table': {
- '$map': {
- 'input': '$competitors.v.extra_result.result_table',
- 'as': 'item',
- 'in': {
- 'traffic_keyword': '$$item.traffic_keyword',
- 'monthly_searches': '$$item.monthly_searches'
- }
- }
- }
- }
- }
- ]
-
- # 将聚合管道转换为模板字符串
- template_str = template_processor.to_template_string(filter_competior_by_name)
- logger.info(f"Template string: {template_str}")
-
- # 初始化数据库
- db_manager = BaseMongoManager(db_name='test')
- await db_manager.initialize()
- db = db_manager.db
-
- # 渲染模板
- try:
- pipeline = template_processor.render(template_str, {
- "product_name": product_name,
- # 可以添加更多变量
- })
- logger.info(f"Rendered pipeline: {json.dumps(pipeline, indent=2, ensure_ascii=False)}")
- except ValueError as e:
- logger.error(f"Template rendering failed: {e}")
- return
-
- # 执行聚合查询
- result = await db.Product.aggregate(pipeline).to_list()
- logger.info(json.dumps(result, ensure_ascii=False))
- # 测试代码
- if __name__ == "__main__":
- asyncio.run(filter_aggregate_demo())
|