from jinja2 import Environment, BaseLoader
import json
from bson import json_util
from src.manager.core.db_mongo import BaseMongoManager
import asyncio
from utils.logu import get_logger
from src.models.field_config import FieldConfig, get_field_descriptions
logger = get_logger('test')
class MongoAggregationTemplate:
def __init__(self):
self.env = Environment(loader=BaseLoader())
# 添加自定义过滤器
self.env.filters['tojson'] = lambda v: json.dumps(v, default=json_util.default)
def to_template_string(self, pipeline):
"""将聚合管道转换为模板字符串"""
return json.dumps(pipeline, default=json_util.default)
def render(self, template_str, context):
"""渲染模板字符串为可执行的聚合管道"""
template = self.env.from_string(template_str)
rendered = template.render(**context)
try:
return json.loads(rendered, object_hook=json_util.object_hook)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON after rendering: {e}\nRendered content:\n{rendered}")
async def filter_aggregate_demo(product_name="电线保护套"):
# 初始化模板处理器
template_processor = MongoAggregationTemplate()
# 定义聚合管道模板
filter_competior_by_name = [
{
'$match': {
'basic_info.name': "{{ product_name }}"
}
},
{
'$project': {
'competitor_crawl_data': 1
}
}, {
'$addFields': {
'competitors': {
'$objectToArray': '$competitor_crawl_data'
}
}
}, {
'$unwind': '$competitors'
}, {
'$project': {
'_id': 0,
'asin': '$competitors.k',
'product_info': {
'main_text': '$competitors.v.extra_result.product_info.main_text'
},
'result_table': {
'$map': {
'input': '$competitors.v.extra_result.result_table',
'as': 'item',
'in': {
'traffic_keyword': '$$item.traffic_keyword',
'monthly_searches': '$$item.monthly_searches'
}
}
}
}
}
]
# 将聚合管道转换为模板字符串
template_str = template_processor.to_template_string(filter_competior_by_name)
logger.info(f"Template string: {template_str}")
# 初始化数据库
db_manager = BaseMongoManager(db_name='test')
await db_manager.initialize()
db = db_manager.db
# 渲染模板
try:
pipeline = template_processor.render(template_str, {
"product_name": product_name,
# 可以添加更多变量
})
logger.info(f"Rendered pipeline: {json.dumps(pipeline, indent=2, ensure_ascii=False)}")
except ValueError as e:
logger.error(f"Template rendering failed: {e}")
return
# 执行聚合查询
result = await db.Product.aggregate(pipeline).to_list()
logger.info(json.dumps(result, ensure_ascii=False))
# 测试代码
if __name__ == "__main__":
asyncio.run(filter_aggregate_demo())
参考这个示例,如果我想实现将模板化从 MongoDB 的某个集合中获取模板,渲染成对象供别的程序查询。或者将某个查询语句格式化为模板,存入到 MongoDB 的集合中。你帮我实现代码。存取操作用 beanie ODM 语法,但是加载和渲染模板文件是外部程序调用,外部程序应该是 pymongo 原生语句来调用渲染后的对象。 存储模板文件的字段可以是: name 、 说明、模板字符串 ,或者有什么字段?你来决定吧。请善于利用多态、继承、组合等设计模式来优化代码结构。为了确保单一职责,你需要自己决定是否需要新建各个不同的文件来模块化。
当前数据库管理文件,你应该不需要修改这个文件,你直接从外部导入即可:
@/src\manager\core\db_mongo.py
import asyncio
from datetime import datetime
from typing import Any, Optional, Dict, List
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import ForeignKeyConstraint
from pydantic import BaseModel,Field
from beanie import Document, Indexed, init_beanie,Link
from motor.motor_asyncio import AsyncIOMotorClient,AsyncIOMotorDatabase
from .config_model import UserConfig,AIPromptConfig
class ProductBaseInfo(BaseModel):
"""产品基本信息
产品名称 电线保护套
包装内容 一个/袋
材质 TPU
颜色 三种颜色:黑色、银色、金色
尺寸 直径6MM,长度93-95CM
包裹尺寸 14*17*3CM
重量 30G
主要用途 保护电源线,车匝线,刹车线
主要卖点
- 优质TPU,健康环保好看,耐磨耐脏
- 耐高温防火阻燃,
- 预防线管老化,延长线路使用寿命
- 柔软易弯曲,可卷起来收纳
"""
name: Optional[str] = None
content: Optional[str] = None
material: Optional[str] = None
color: Optional[str] = None
size: Optional[str] = None
packaging_size: Optional[str] = None
weight: Optional[str] = None
main_usage: Optional[str] = None
selling_point: Optional[List[str]] = None
# 价格成本专用表(与变体一对一关系)
class Variant(BaseModel):
name: Optional[str] = None
# 核心财务字段(结构化存储)
base_price: Optional[float] = None
commission_rate: Optional[float] = None
fba_fee: Optional[float] = None
cost_rmb: Optional[float] = None
logistics_cost: Optional[float] = None
class TrafficKeywordResult(BaseModel):
"""流量关键词结果"""
traffic_keyword: Optional[str] = Field(
default=None,
description="竞品同类相似的关键词名称"
)
keyword_link: Optional[str] = Field(
default=None,
description="关键词详情页链接"
)
monthly_searches: Optional[str] = Field(
default=None,
description="竞品同类相似关键词的月搜索量"
)
amazon_search_link: Optional[str] = Field(
default=None,
description="亚马逊搜索链接"
)
class ProductImageInfo(BaseModel):
"""产品图片信息"""
image_url: Optional[str] = Field(
default=None,
description="产品图片URL"
)
goto_amazon: Optional[str] = Field(
default=None,
description="跳转亚马逊链接"
)
main_text: Optional[str] = Field(
default=None,
description="该 asin 商品的主要文字内容"
)
img_path: Optional[str] = Field(
default=None,
description="本地图片存储路径"
)
class ExtraResultModel(BaseModel):
"""额外结果数据模型"""
result_table: Optional[List[TrafficKeywordResult]] = None
product_info: Optional[ProductImageInfo] = None
unique_words: Optional[List[str]] = None
class CompetitorCrawlData(BaseModel):
'''竞品表'''
sql_id: Optional[int] = Field(default=None)
asin: Optional[str] = Field(default=None, description="竞品商品的编号")
asin_area: Optional[str] = 'JP'
# 爬取数据的 S3 路径
extra_result_path: Optional[str] = None
extra_result: Optional[ExtraResultModel] = None
mhtml_path: Optional[str] = None
error: Optional[str] = None
created_at: Optional[datetime] = Field(default_factory=datetime.now)
class SearchAmazoneKeyResult(BaseModel):
search_key:Optional[str] = None
suggestions:List[str] = []
mhtml_path:Optional[str] = None
screenshot:Optional[str] = None
error:Optional[int] = None
msg:Optional[str] = None
created_at:Optional[datetime] = Field(default_factory=datetime.now)
# 产品主表(核心实体)
class Product(Document):
basic_info: Optional[ProductBaseInfo] = Field(
default=None,
description="产品基本信息"
)
competitor_crawl_data: Optional[Dict[str, CompetitorCrawlData]] = Field(
default={}, # 明确设置默认值为 None
description="竞品分析信息,使用JSONB存储。竞品主关键词分析、竞品长尾词分析。。。")
# 变体,如版本、型号、颜色、套餐,各个变体对应着价格、成本等财务数据
variants: Optional[List[Variant]] = Field(
default=None,
description="产品变体信息,使用JSONB存储。如:{'color':['黑色','金色'],'size':['1m','3m']}"
)
created_at: Optional[datetime] = Field(default_factory=datetime.now)
updated_at: Optional[datetime] = Field(default=None)
async def example_usage():
"""更新后的示例调用代码"""
db_manager = BaseMongoManager()
await db_manager.initialize()
# 获取产品及用户配置
product = await Product.find_one(Product.basic_info.name == "电线保护套")
logger.info(f"{product.basic_info}")
ODM 方式 product.basic_info 仅仅打印了该字段的内容,如果不用 python ,而是在 MongoDB Compass 客户端,原生的 query JSON 语句,如何仅仅打印该内容?
我要做一个 AI 运营数据分析和生成。
我用到的框架是 llamaindex ,它已经有完善的接口和请求响应的接口。我只需要从 MongoDB 中准备数据,构造成提示词,用 llamaindex 框架发送给不同的 AI 模型,然后把结果保存到 MongoDB 中。
AI 推理的效果受到各种因素影响,例如不同的模型、不同的提示词、不同的数据内容、使用非 llamaindex 的框架例如 Agent 、 Deep search SDK 或 API 接口。。。因此我希望可以使用多个 AI 模型或者多种数据多种方式来推理市场营销关键词文案和竞品关键词生成,每次推理的结果都保存到 MongoDB 文档的集合 AIExecutionRecords 中。
虽然目前只有"市场营销字段和竞品关键词"这两个字段需要AI推理出结果,但是未来可能会有更多的字段需要分析。例如用户需求分析、产品评论、同行卖点等。
目前我定义好了以下文件和数据模型:
@/src\models\ai_execution_record.py
@/src\models\product_model.py
@/src/manager/core/db_mongo.py
功能概述,我的场景:
目前我定义好了以下文件和数据模型:
@/src\models\ai_execution_record.py
@/src\models\product_model.py
@/src/manager/core/db_mongo.py
这是一个参考文件,实现了 _prepare_competitor_prompt 、 execute_marketing_analysis 、run_competitor_analysis ,但是参考文件某些地方不一定对。你需要结合你自己的架构来设计真正能运行的代码
@/src\ai\agent_product.py
针对上述场景,我想写一个专门用于筛选合适字段的代码,我应该利用 MongoDB 的字段筛选与投影功能,将查询语法保存到 MongoDB 的配置集合中,以后我从配置中读取语句直接执行即可获得我想要的数据。
通过查看 agent_product.py 文件,目前所需的 字段筛选只有 product.basic_info 和 competitor_data ,你帮我实现它,自己决定在哪里创建文件。不要修改 agent_product.py 示例文件。
我看到 beanie 或者 pymongo 可以执行查询语句,只需要传参 query_string 即可。现在不用 ODM 语法,你帮我写出筛选 product.basic_info 和 competitor_data 的语句,将代码写在 tests\mytest\models\t_mongo_filter.py 文件中
需要兼容不同的使用场景,千万不能硬编码,可能涉及到不同的方法、模板、格式化,请善于利用多态、继承、组合等设计模式来优化代码结构。为了确保单一职责,你需要自己决定是否需要新建各个不同的文件来模块化。