MongoDB筛选和投影.md 13 KB

模板化存取

from jinja2 import Environment, BaseLoader
import json
from bson import json_util
from src.manager.core.db_mongo import BaseMongoManager
import asyncio
from utils.logu import get_logger
from src.models.field_config import FieldConfig, get_field_descriptions

logger = get_logger('test')

class MongoAggregationTemplate:
    def __init__(self):
        self.env = Environment(loader=BaseLoader())
        # 添加自定义过滤器
        self.env.filters['tojson'] = lambda v: json.dumps(v, default=json_util.default)
    
    def to_template_string(self, pipeline):
        """将聚合管道转换为模板字符串"""
        return json.dumps(pipeline, default=json_util.default)
    
    def render(self, template_str, context):
        """渲染模板字符串为可执行的聚合管道"""
        template = self.env.from_string(template_str)
        rendered = template.render(**context)
        
        try:
            return json.loads(rendered, object_hook=json_util.object_hook)
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON after rendering: {e}\nRendered content:\n{rendered}")

async def filter_aggregate_demo(product_name="电线保护套"):
    # 初始化模板处理器
    template_processor = MongoAggregationTemplate()
    
    # 定义聚合管道模板
    filter_competior_by_name = [
        {
            '$match': {
                'basic_info.name': "{{ product_name }}"
            }  
        },
        {
            '$project': {
                'competitor_crawl_data': 1
            }
        }, {
            '$addFields': {
                'competitors': {
                    '$objectToArray': '$competitor_crawl_data'
                }
            }
        }, {
            '$unwind': '$competitors'
        }, {
            '$project': {
                '_id': 0, 
                'asin': '$competitors.k', 
                'product_info': {
                    'main_text': '$competitors.v.extra_result.product_info.main_text'
                }, 
                'result_table': {
                    '$map': {
                        'input': '$competitors.v.extra_result.result_table', 
                        'as': 'item', 
                        'in': {
                            'traffic_keyword': '$$item.traffic_keyword', 
                            'monthly_searches': '$$item.monthly_searches'
                        }
                    }
                }
            }
        }
    ]
    
    # 将聚合管道转换为模板字符串
    template_str = template_processor.to_template_string(filter_competior_by_name)
    logger.info(f"Template string: {template_str}")
    
    # 初始化数据库
    db_manager = BaseMongoManager(db_name='test')
    await db_manager.initialize()
    db = db_manager.db
    
    # 渲染模板
    try:
        pipeline = template_processor.render(template_str, {
            "product_name": product_name,
            # 可以添加更多变量
        })
        logger.info(f"Rendered pipeline: {json.dumps(pipeline, indent=2, ensure_ascii=False)}")
    except ValueError as e:
        logger.error(f"Template rendering failed: {e}")
        return
    
    # 执行聚合查询
    result = await db.Product.aggregate(pipeline).to_list()
    logger.info(json.dumps(result, ensure_ascii=False))

# 测试代码
if __name__ == "__main__":
    asyncio.run(filter_aggregate_demo())

参考这个示例,如果我想实现将模板化从 MongoDB 的某个集合中获取模板,渲染成对象供别的程序查询。或者将某个查询语句格式化为模板,存入到 MongoDB 的集合中。你帮我实现代码。存取操作用 beanie ODM 语法,但是加载和渲染模板文件是外部程序调用,外部程序应该是 pymongo 原生语句来调用渲染后的对象。 存储模板文件的字段可以是: name 、 说明、模板字符串 ,或者有什么字段?你来决定吧。请善于利用多态、继承、组合等设计模式来优化代码结构。为了确保单一职责,你需要自己决定是否需要新建各个不同的文件来模块化。

当前数据库管理文件,你应该不需要修改这个文件,你直接从外部导入即可:

@/src\manager\core\db_mongo.py

mongodb shell 语法

import asyncio
from datetime import datetime
from typing import Any, Optional, Dict, List
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import ForeignKeyConstraint
from pydantic import BaseModel,Field
from beanie import Document, Indexed, init_beanie,Link
from motor.motor_asyncio import AsyncIOMotorClient,AsyncIOMotorDatabase
from .config_model import UserConfig,AIPromptConfig

class ProductBaseInfo(BaseModel):
    """产品基本信息
产品名称	电线保护套
包装内容	一个/袋
材质	TPU
颜色	三种颜色:黑色、银色、金色
尺寸	直径6MM,长度93-95CM
包裹尺寸	14*17*3CM
重量	30G
主要用途	保护电源线,车匝线,刹车线
主要卖点
- 优质TPU,健康环保好看,耐磨耐脏
- 耐高温防火阻燃,
- 预防线管老化,延长线路使用寿命
- 柔软易弯曲,可卷起来收纳
    """
    name: Optional[str] = None
    content: Optional[str] = None
    material: Optional[str] = None
    color: Optional[str] = None
    size: Optional[str] = None
    packaging_size: Optional[str] = None
    weight: Optional[str] = None
    main_usage: Optional[str] = None
    selling_point: Optional[List[str]] = None

# 价格成本专用表(与变体一对一关系)
class Variant(BaseModel):
    name: Optional[str] = None
    # 核心财务字段(结构化存储)
    base_price: Optional[float] = None
    commission_rate: Optional[float] = None
    fba_fee: Optional[float] = None
    cost_rmb: Optional[float] = None
    logistics_cost: Optional[float] = None

class TrafficKeywordResult(BaseModel):
    """流量关键词结果"""
    traffic_keyword: Optional[str] = Field(
        default=None,
        description="竞品同类相似的关键词名称"
    )
    keyword_link: Optional[str] = Field(
        default=None,
        description="关键词详情页链接"
    )
    monthly_searches: Optional[str] = Field(
        default=None,
        description="竞品同类相似关键词的月搜索量"
    )
    amazon_search_link: Optional[str] = Field(
        default=None,
        description="亚马逊搜索链接"
    )

class ProductImageInfo(BaseModel):
    """产品图片信息"""
    image_url: Optional[str] = Field(
        default=None,
        description="产品图片URL"
    )
    goto_amazon: Optional[str] = Field(
        default=None,
        description="跳转亚马逊链接"
    )
    main_text: Optional[str] = Field(
        default=None,
        description="该 asin 商品的主要文字内容"
    )
    img_path: Optional[str] = Field(
        default=None,
        description="本地图片存储路径"
    )

class ExtraResultModel(BaseModel):
    """额外结果数据模型"""
    result_table: Optional[List[TrafficKeywordResult]] = None
    product_info: Optional[ProductImageInfo] = None
    unique_words: Optional[List[str]] = None

class CompetitorCrawlData(BaseModel):
    '''竞品表'''
    sql_id: Optional[int] = Field(default=None)
    asin: Optional[str] = Field(default=None, description="竞品商品的编号")
    asin_area: Optional[str] = 'JP'
    # 爬取数据的 S3 路径
    extra_result_path: Optional[str] = None
    extra_result: Optional[ExtraResultModel] = None
    mhtml_path: Optional[str] = None
    error: Optional[str] = None
    created_at: Optional[datetime] = Field(default_factory=datetime.now)

class SearchAmazoneKeyResult(BaseModel):
    search_key:Optional[str] = None
    suggestions:List[str] = []
    mhtml_path:Optional[str] = None
    screenshot:Optional[str] = None
    error:Optional[int] = None
    msg:Optional[str] = None
    created_at:Optional[datetime] = Field(default_factory=datetime.now)

# 产品主表(核心实体)
class Product(Document):
    basic_info: Optional[ProductBaseInfo] = Field(
        default=None,
        description="产品基本信息"
    )
    competitor_crawl_data: Optional[Dict[str, CompetitorCrawlData]] = Field(
        default={},  # 明确设置默认值为 None
        description="竞品分析信息,使用JSONB存储。竞品主关键词分析、竞品长尾词分析。。。")
    # 变体,如版本、型号、颜色、套餐,各个变体对应着价格、成本等财务数据
    variants: Optional[List[Variant]] = Field(
        default=None,
        description="产品变体信息,使用JSONB存储。如:{'color':['黑色','金色'],'size':['1m','3m']}" 
    )
    created_at: Optional[datetime] = Field(default_factory=datetime.now)
    updated_at: Optional[datetime] = Field(default=None)


async def example_usage():
    """更新后的示例调用代码"""
    db_manager = BaseMongoManager()
    await db_manager.initialize()
    
    # 获取产品及用户配置
    product = await Product.find_one(Product.basic_info.name == "电线保护套")
    logger.info(f"{product.basic_info}")

ODM 方式 product.basic_info 仅仅打印了该字段的内容,如果不用 python ,而是在 MongoDB Compass 客户端,原生的 query JSON 语句,如何仅仅打印该内容?

MongoDB 字段的筛选与投影,保存到配置文件

我要做一个 AI 运营数据分析和生成。

我用到的框架是 llamaindex ,它已经有完善的接口和请求响应的接口。我只需要从 MongoDB 中准备数据,构造成提示词,用 llamaindex 框架发送给不同的 AI 模型,然后把结果保存到 MongoDB 中。

AI 推理的效果受到各种因素影响,例如不同的模型、不同的提示词、不同的数据内容、使用非 llamaindex 的框架例如 Agent 、 Deep search SDK 或 API 接口。。。因此我希望可以使用多个 AI 模型或者多种数据多种方式来推理市场营销关键词文案和竞品关键词生成,每次推理的结果都保存到 MongoDB 文档的集合 AIExecutionRecords 中。

虽然目前只有"市场营销字段和竞品关键词"这两个字段需要AI推理出结果,但是未来可能会有更多的字段需要分析。例如用户需求分析、产品评论、同行卖点等。

目前我定义好了以下文件和数据模型:

@/src\models\ai_execution_record.py

@/src\models\product_model.py

@/src/manager/core/db_mongo.py

功能概述,我的场景:

  • UserConfig 用户配置集合中,通过外部配置好了。你直接读取即可获得提示词和有关配置参数。
  • product 在别的程序中已经把有关数据存好了,你读取即可解析到想要的内容。
  • 我需要自己传参决定用什么模型,什么提示词信息,什么上下文数据或格式化的字段,相关数据存储在数据库 Product 集合 和 Userconfig 集合。或者也可以通过外部调用传参的方式决定用什么提示词、用什么数据。
  • 我用 BaseAIExecution 及其子类来代表不同的 AI 任务,同一个任务也会有不同的模型名称,提示词,或者第三方API 智能体,只有这样才能总和对比各个任务执行效果。例如 MarketingContentGeneration 营销文案生成器

目前我定义好了以下文件和数据模型:

@/src\models\ai_execution_record.py

@/src\models\product_model.py

@/src/manager/core/db_mongo.py

这是一个参考文件,实现了 _prepare_competitor_prompt 、 execute_marketing_analysis 、run_competitor_analysis ,但是参考文件某些地方不一定对。你需要结合你自己的架构来设计真正能运行的代码

@/src\ai\agent_product.py

针对上述场景,我想写一个专门用于筛选合适字段的代码,我应该利用 MongoDB 的字段筛选与投影功能,将查询语法保存到 MongoDB 的配置集合中,以后我从配置中读取语句直接执行即可获得我想要的数据。

通过查看 agent_product.py 文件,目前所需的 字段筛选只有 product.basic_info 和 competitor_data ,你帮我实现它,自己决定在哪里创建文件。不要修改 agent_product.py 示例文件。

我看到 beanie 或者 pymongo 可以执行查询语句,只需要传参 query_string 即可。现在不用 ODM 语法,你帮我写出筛选 product.basic_info 和 competitor_data 的语句,将代码写在 tests\mytest\models\t_mongo_filter.py 文件中

需要兼容不同的使用场景,千万不能硬编码,可能涉及到不同的方法、模板、格式化,请善于利用多态、继承、组合等设计模式来优化代码结构。为了确保单一职责,你需要自己决定是否需要新建各个不同的文件来模块化。