Răsfoiți Sursa

新增MongoDB模板化查询集合和管理服务

mrh 11 luni în urmă
părinte
comite
50309ed886

+ 1 - 1
README.md

@@ -20,7 +20,7 @@ G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongodump.
 $timestr = Get-Date -Format "yyyyMMdd_HH_mm_ss"
 echo $timestr
 # 恢复到 test 数据库
-G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongorestore.exe --uri="mongodb://sv-v2:27017/test" I:\eng\backup\mongo\20250327_01_04_08\amazone\Product.bson
+G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongorestore.exe --uri="mongodb://sv-v2:27017/test" "I:\eng\backup\mongo\20250327_01_04_08\amazone\Product.bson"
 
 G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongorestore.exe --uri="mongodb://sv-v2:27017/backup_amazone" --collection=Product G:\code\amazone\copywriting_production\output\temp
 mongoimport --uri="mongodb://sv-v2:27017" --collection=users --file=G:\code\amazone\copywriting_production\output\mongodb_backup\users.json

+ 308 - 0
docs/gpt/MongoDB筛选和投影.md

@@ -0,0 +1,308 @@
+# 模板化存取
+```python
+from jinja2 import Environment, BaseLoader
+import json
+from bson import json_util
+from src.manager.core.db_mongo import BaseMongoManager
+import asyncio
+from utils.logu import get_logger
+from src.models.field_config import FieldConfig, get_field_descriptions
+
+logger = get_logger('test')
+
+class MongoAggregationTemplate:
+    def __init__(self):
+        self.env = Environment(loader=BaseLoader())
+        # 添加自定义过滤器
+        self.env.filters['tojson'] = lambda v: json.dumps(v, default=json_util.default)
+    
+    def to_template_string(self, pipeline):
+        """将聚合管道转换为模板字符串"""
+        return json.dumps(pipeline, default=json_util.default)
+    
+    def render(self, template_str, context):
+        """渲染模板字符串为可执行的聚合管道"""
+        template = self.env.from_string(template_str)
+        rendered = template.render(**context)
+        
+        try:
+            return json.loads(rendered, object_hook=json_util.object_hook)
+        except json.JSONDecodeError as e:
+            raise ValueError(f"Invalid JSON after rendering: {e}\nRendered content:\n{rendered}")
+
+async def filter_aggregate_demo(product_name="电线保护套"):
+    # 初始化模板处理器
+    template_processor = MongoAggregationTemplate()
+    
+    # 定义聚合管道模板
+    filter_competior_by_name = [
+        {
+            '$match': {
+                'basic_info.name': "{{ product_name }}"
+            }  
+        },
+        {
+            '$project': {
+                'competitor_crawl_data': 1
+            }
+        }, {
+            '$addFields': {
+                'competitors': {
+                    '$objectToArray': '$competitor_crawl_data'
+                }
+            }
+        }, {
+            '$unwind': '$competitors'
+        }, {
+            '$project': {
+                '_id': 0, 
+                'asin': '$competitors.k', 
+                'product_info': {
+                    'main_text': '$competitors.v.extra_result.product_info.main_text'
+                }, 
+                'result_table': {
+                    '$map': {
+                        'input': '$competitors.v.extra_result.result_table', 
+                        'as': 'item', 
+                        'in': {
+                            'traffic_keyword': '$$item.traffic_keyword', 
+                            'monthly_searches': '$$item.monthly_searches'
+                        }
+                    }
+                }
+            }
+        }
+    ]
+    
+    # 将聚合管道转换为模板字符串
+    template_str = template_processor.to_template_string(filter_competior_by_name)
+    logger.info(f"Template string: {template_str}")
+    
+    # 初始化数据库
+    db_manager = BaseMongoManager(db_name='test')
+    await db_manager.initialize()
+    db = db_manager.db
+    
+    # 渲染模板
+    try:
+        pipeline = template_processor.render(template_str, {
+            "product_name": product_name,
+            # 可以添加更多变量
+        })
+        logger.info(f"Rendered pipeline: {json.dumps(pipeline, indent=2, ensure_ascii=False)}")
+    except ValueError as e:
+        logger.error(f"Template rendering failed: {e}")
+        return
+    
+    # 执行聚合查询
+    result = await db.Product.aggregate(pipeline).to_list()
+    logger.info(json.dumps(result, ensure_ascii=False))
+
+# 测试代码
+if __name__ == "__main__":
+    asyncio.run(filter_aggregate_demo())
+
+```
+参考这个示例,如果我想实现将模板化从 MongoDB 的某个集合中获取模板,渲染成对象供别的程序查询。或者将某个查询语句格式化为模板,存入到 MongoDB 的集合中。你帮我实现代码。存取操作用 beanie ODM 语法,但是加载和渲染模板文件是外部程序调用,外部程序应该是 pymongo 原生语句来调用渲染后的对象。
+存储模板文件的字段可以是: name 、 说明、模板字符串 ,或者有什么字段?你来决定吧。请善于利用多态、继承、组合等设计模式来优化代码结构。为了确保单一职责,你需要自己决定是否需要新建各个不同的文件来模块化。
+
+当前数据库管理文件,你应该不需要修改这个文件,你直接从外部导入即可: 
+
+@/src\manager\core\db_mongo.py 
+
+
+# mongodb shell 语法
+```python
+import asyncio
+from datetime import datetime
+from typing import Any, Optional, Dict, List
+from sqlalchemy.dialects.postgresql import JSONB
+from sqlalchemy import ForeignKeyConstraint
+from pydantic import BaseModel,Field
+from beanie import Document, Indexed, init_beanie,Link
+from motor.motor_asyncio import AsyncIOMotorClient,AsyncIOMotorDatabase
+from .config_model import UserConfig,AIPromptConfig
+
+class ProductBaseInfo(BaseModel):
+    """产品基本信息
+产品名称	电线保护套
+包装内容	一个/袋
+材质	TPU
+颜色	三种颜色:黑色、银色、金色
+尺寸	直径6MM,长度93-95CM
+包裹尺寸	14*17*3CM
+重量	30G
+主要用途	保护电源线,车匝线,刹车线
+主要卖点
+- 优质TPU,健康环保好看,耐磨耐脏
+- 耐高温防火阻燃,
+- 预防线管老化,延长线路使用寿命
+- 柔软易弯曲,可卷起来收纳
+    """
+    name: Optional[str] = None
+    content: Optional[str] = None
+    material: Optional[str] = None
+    color: Optional[str] = None
+    size: Optional[str] = None
+    packaging_size: Optional[str] = None
+    weight: Optional[str] = None
+    main_usage: Optional[str] = None
+    selling_point: Optional[List[str]] = None
+
+# 价格成本专用表(与变体一对一关系)
+class Variant(BaseModel):
+    name: Optional[str] = None
+    # 核心财务字段(结构化存储)
+    base_price: Optional[float] = None
+    commission_rate: Optional[float] = None
+    fba_fee: Optional[float] = None
+    cost_rmb: Optional[float] = None
+    logistics_cost: Optional[float] = None
+
+class TrafficKeywordResult(BaseModel):
+    """流量关键词结果"""
+    traffic_keyword: Optional[str] = Field(
+        default=None,
+        description="竞品同类相似的关键词名称"
+    )
+    keyword_link: Optional[str] = Field(
+        default=None,
+        description="关键词详情页链接"
+    )
+    monthly_searches: Optional[str] = Field(
+        default=None,
+        description="竞品同类相似关键词的月搜索量"
+    )
+    amazon_search_link: Optional[str] = Field(
+        default=None,
+        description="亚马逊搜索链接"
+    )
+
+class ProductImageInfo(BaseModel):
+    """产品图片信息"""
+    image_url: Optional[str] = Field(
+        default=None,
+        description="产品图片URL"
+    )
+    goto_amazon: Optional[str] = Field(
+        default=None,
+        description="跳转亚马逊链接"
+    )
+    main_text: Optional[str] = Field(
+        default=None,
+        description="该 asin 商品的主要文字内容"
+    )
+    img_path: Optional[str] = Field(
+        default=None,
+        description="本地图片存储路径"
+    )
+
+class ExtraResultModel(BaseModel):
+    """额外结果数据模型"""
+    result_table: Optional[List[TrafficKeywordResult]] = None
+    product_info: Optional[ProductImageInfo] = None
+    unique_words: Optional[List[str]] = None
+
+class CompetitorCrawlData(BaseModel):
+    '''竞品表'''
+    sql_id: Optional[int] = Field(default=None)
+    asin: Optional[str] = Field(default=None, description="竞品商品的编号")
+    asin_area: Optional[str] = 'JP'
+    # 爬取数据的 S3 路径
+    extra_result_path: Optional[str] = None
+    extra_result: Optional[ExtraResultModel] = None
+    mhtml_path: Optional[str] = None
+    error: Optional[str] = None
+    created_at: Optional[datetime] = Field(default_factory=datetime.now)
+
+class SearchAmazoneKeyResult(BaseModel):
+    search_key:Optional[str] = None
+    suggestions:List[str] = []
+    mhtml_path:Optional[str] = None
+    screenshot:Optional[str] = None
+    error:Optional[int] = None
+    msg:Optional[str] = None
+    created_at:Optional[datetime] = Field(default_factory=datetime.now)
+
+# 产品主表(核心实体)
+class Product(Document):
+    basic_info: Optional[ProductBaseInfo] = Field(
+        default=None,
+        description="产品基本信息"
+    )
+    competitor_crawl_data: Optional[Dict[str, CompetitorCrawlData]] = Field(
+        default={},  # 明确设置默认值为 None
+        description="竞品分析信息,使用JSONB存储。竞品主关键词分析、竞品长尾词分析。。。")
+    # 变体,如版本、型号、颜色、套餐,各个变体对应着价格、成本等财务数据
+    variants: Optional[List[Variant]] = Field(
+        default=None,
+        description="产品变体信息,使用JSONB存储。如:{'color':['黑色','金色'],'size':['1m','3m']}" 
+    )
+    created_at: Optional[datetime] = Field(default_factory=datetime.now)
+    updated_at: Optional[datetime] = Field(default=None)
+
+
+```
+
+```python
+async def example_usage():
+    """更新后的示例调用代码"""
+    db_manager = BaseMongoManager()
+    await db_manager.initialize()
+    
+    # 获取产品及用户配置
+    product = await Product.find_one(Product.basic_info.name == "电线保护套")
+    logger.info(f"{product.basic_info}")
+
+```
+ODM 方式 product.basic_info 仅仅打印了该字段的内容,如果不用 python ,而是在 MongoDB Compass 客户端,原生的 query JSON 语句,如何仅仅打印该内容?
+
+# MongoDB 字段的筛选与投影,保存到配置文件
+我要做一个 AI 运营数据分析和生成。
+
+我用到的框架是 llamaindex ,它已经有完善的接口和请求响应的接口。我只需要从 MongoDB 中准备数据,构造成提示词,用 llamaindex 框架发送给不同的 AI 模型,然后把结果保存到 MongoDB 中。
+
+AI 推理的效果受到各种因素影响,例如不同的模型、不同的提示词、不同的数据内容、使用非 llamaindex 的框架例如 Agent 、 Deep search SDK 或 API 接口。。。因此我希望可以使用多个 AI 模型或者多种数据多种方式来推理市场营销关键词文案和竞品关键词生成,每次推理的结果都保存到 MongoDB 文档的集合 AIExecutionRecords 中。
+
+虽然目前只有"市场营销字段和竞品关键词"这两个字段需要AI推理出结果,但是未来可能会有更多的字段需要分析。例如用户需求分析、产品评论、同行卖点等。
+
+目前我定义好了以下文件和数据模型:
+
+@/src\models\ai_execution_record.py 
+
+@/src\models\product_model.py 
+
+@/src/manager/core/db_mongo.py 
+
+
+
+功能概述,我的场景:
+- UserConfig 用户配置集合中,通过外部配置好了。你直接读取即可获得提示词和有关配置参数。
+- product 在别的程序中已经把有关数据存好了,你读取即可解析到想要的内容。
+- 我需要自己传参决定用什么模型,什么提示词信息,什么上下文数据或格式化的字段,相关数据存储在数据库 Product 集合 和 Userconfig 集合。或者也可以通过外部调用传参的方式决定用什么提示词、用什么数据。
+- 我用 BaseAIExecution 及其子类来代表不同的 AI 任务,同一个任务也会有不同的模型名称,提示词,或者第三方API 智能体,只有这样才能总和对比各个任务执行效果。例如 MarketingContentGeneration 营销文案生成器
+
+
+目前我定义好了以下文件和数据模型:
+
+@/src\models\ai_execution_record.py 
+
+@/src\models\product_model.py 
+
+@/src/manager/core/db_mongo.py 
+
+
+这是一个参考文件,实现了 _prepare_competitor_prompt 、 execute_marketing_analysis 、run_competitor_analysis ,但是参考文件某些地方不一定对。你需要结合你自己的架构来设计真正能运行的代码
+
+@/src\ai\agent_product.py 
+
+
+针对上述场景,我想写一个专门用于筛选合适字段的代码,我应该利用 MongoDB 的字段筛选与投影功能,将查询语法保存到 MongoDB 的配置集合中,以后我从配置中读取语句直接执行即可获得我想要的数据。
+
+通过查看 agent_product.py  文件,目前所需的 字段筛选只有 product.basic_info 和 competitor_data ,你帮我实现它,自己决定在哪里创建文件。不要修改 agent_product.py 示例文件。
+
+我看到 beanie 或者 pymongo 可以执行查询语句,只需要传参 query_string 即可。现在不用 ODM 语法,你帮我写出筛选 product.basic_info 和 competitor_data 的语句,将代码写在 tests\mytest\models\t_mongo_filter.py 文件中
+ 
+
+需要兼容不同的使用场景,千万不能硬编码,可能涉及到不同的方法、模板、格式化,请善于利用多态、继承、组合等设计模式来优化代码结构。为了确保单一职责,你需要自己决定是否需要新建各个不同的文件来模块化。
+

+ 19 - 15
docs/gpt/ai_execute.md

@@ -1,34 +1,38 @@
 # 假设使用示例
-在ai_analysis模型的文件中,需要用到 AI 进行推理分析才能存放结果。由于运营分析是一个综合考虑的过程,因此我希望可以使用多个 AI 模型或者多种数据多种方式来推理分析市场营销字段和竞品字段分析,有时候同一段数据也会生成多次分析。比较每次是否有差异。然后把AI分析的结果统一保存到一个单独集合中。虽然目前只有这两个字段需要AI分析保存分析结果,但是未来可能会有更多的字段需要分析
+我要做一个 AI 运营数据分析和生成
 
-然后你为我生成了代码在 src\ai\ai_executor 目录,
+我用到的框架是 llamaindex ,它已经有完善的接口和请求响应的接口。我只需要从 MongoDB 中准备数据,构造成提示词,用 llamaindex 框架发送给不同的 AI 模型,然后把结果保存到 MongoDB 中。
 
-@/src\ai\ai_executor 
+AI 推理的效果受到各种因素影响,例如不同的模型、不同的提示词、不同的数据内容、使用非 llamaindex 的框架例如 Agent 、 Deep search SDK 或 API 接口。。。因此我希望可以使用多个 AI 模型或者多种数据多种方式来推理市场营销关键词文案和竞品关键词生成,每次推理的结果都保存到 MongoDB 文档的集合 AIExecutionRecords 中。
+
+虽然目前只有"市场营销字段和竞品关键词"这两个字段需要AI推理出结果,但是未来可能会有更多的字段需要分析。例如用户需求分析、产品评论、同行卖点等。
+
+目前我定义好了以下文件和数据模型:
 
 @/src\ai\run_service.py 
 
 @/src\models\ai_execution_record.py 
 
-这是你生成的架构文档:
+@/src/manager/core/db_mongo.py 
 
-@/docs\dev\ai_executor_module.md 
-
-我不了解你生成的代码,并且你生成代码不知道是否符合我的预期,我没有检查过。
+src\ai\run_service.py 是用来运行 ai_executor 模块里面的代码,这仅仅是一个示例的设想,你仅供参考。
 
-但是我的设想是 run_service.py  中的示例程序。对我来说模块里面是黑盒, execute_task 应该会调用 llamaindex 的相关方法并且请求获取结果。
+你为我生成代码在 src\ai\ai_executor 目录, execute_task 对我来说模块里面是黑盒, execute_task 应该会构造提示词,调用 llamaindex 的相关方法并且请求获取结果。或者由 execute_task 自动从 MongoDB 中读取所需数据。你来决定最佳设计。
 
-当我要分析市场营销信息时,我也会构造不同的 prompt。也可能使用不同的AI模型。未来也不仅仅要分析市场营销,还有很多别的场景可能也要分析。
-我会将每次的结果都保存到 MongoDB 中。
 
 功能概述,我的场景:
-- src\ai\ai_executor 就两个功能,提交数据 - llmaindex 请求大模型 - 返回数据存入 mongodb
-- 用户配置集合中,我获取了prompting,以作为不同提示词模板来看看AI分析结果有何改进。
-- 目前就需要分析分析市场营销文案、竞品和长尾词推理建议。可能未来会扩展不同的文档模型和字段,用于分析产品评论信息分析、用户需求分析。。。
+- UserConfig 用户配置集合中,通过外部配置好了。你直接读取即可获得提示词和有关配置参数。
+- product 在别的程序中已经把有关数据存好了,你读取即可解析到想要的内容。
 - 我需要自己传参决定用什么模型,什么提示词信息,什么上下文数据或格式化的字段,相关数据存储在数据库 Product 集合 和 Userconfig 集合。或者也可以通过外部调用传参的方式决定用什么提示词、用什么数据。
-- 我用 BaseAIExecution 及其子类来代表不同的 AI 任务,同一个任务也会有不同的模型名称,提示词,或者第三方API 智能体,只有这样才能总和对比各个任务执行效果。例如 MarketingContentGeneration 营销文案生成器
+- 我用 BaseAIExecution 及其子类来代表不同的 AI 任务,同一个任务也会有不同的模型名称,提示词,或者第三方API 智能体,只有这样才能总和对比各个任务执行效果。例如 MarketingContentGeneration 营销文案生成器
 
 
-ai_executor 模块目前有很多不符合我场景的地方。我并没有检查里面的代码,如果你觉得我 run_service.py 中的示例符合最佳实践架构,你帮我重构 ai_executor 里面的所有代码。如果文件不符合你最佳设计架构,请删除,或者文件名容易误解或者混淆,请重命名。该模块内部的任何文件你都可以删去或完全不保留原有架构。它对我来说是黑盒,我无所谓它具体实现。
+ai_executor 模块目前是空的。你先为我生成一个代码架构,然后你后续会根据架构来完成这个黑盒。
+
+这是一个参考文件,实现了 _prepare_competitor_prompt 、 execute_marketing_analysis 、run_competitor_analysis ,但是参考文件某些地方不一定对。你需要结合你自己的架构来设计真正能运行的代码
+
+@/src\ai\agent_product.py 
+
 
 需要兼容不同的使用场景,千万不能硬编码,可能涉及到不同的方法、模板、格式化,请善于利用多态、继承、组合等设计模式来优化代码结构。为了确保单一职责,你需要自己决定是否需要新建各个不同的文件来模块化。
 

+ 0 - 14
src/ai/ai_executor/__init__.py

@@ -1,14 +0,0 @@
-from .service import AIExecutorService
-from .analyzer import AITaskHandler
-from .analyzers import MarketingCopyGenerator, CompetitorAnalyzer
-from .factory import TaskHandlerFactory
-from models.ai_execution_record import AITaskType
-
-__all__ = [
-    "AIExecutorService",
-    "AITaskHandler",
-    "MarketingCopyGenerator",
-    "CompetitorAnalyzer",
-    "TaskHandlerFactory",
-    "AITaskType"
-]

+ 0 - 45
src/ai/ai_executor/analyzer.py

@@ -1,45 +0,0 @@
-from abc import ABC, abstractmethod
-from typing import Optional, Any, Type
-from pydantic import BaseModel
-from src.models.product_model import Product
-from models.ai_execution_record import AITaskType
-
-class AITaskHandler(ABC):
-    """AI任务处理器抽象接口"""
-    
-    @property
-    @abstractmethod
-    def task_type(self) -> AITaskType:
-        """返回任务类型"""
-        raise NotImplementedError
-
-    @abstractmethod
-    async def format_prompt(
-        self,
-        product: Product,
-        custom_template: Optional[str] = None
-    ) -> str:
-        """格式化提示词
-        Args:
-            product: 产品对象
-            custom_template: 自定义提示词模板
-        Returns:
-            格式化后的提示词字符串
-        """
-        raise NotImplementedError
-
-    @abstractmethod
-    def get_result_model(self) -> Type[BaseModel]:
-        """获取对应的结果模型类
-        Returns:
-            继承自BaseModel的结果模型类
-        """
-        raise NotImplementedError
-
-    @abstractmethod
-    def get_format_instructions(self) -> str:
-        """获取输出格式指令
-        Returns:
-            格式指令字符串
-        """
-        raise NotImplementedError

+ 0 - 69
src/ai/ai_executor/analyzers.py

@@ -1,69 +0,0 @@
-from typing import Optional, Type
-from pydantic import BaseModel
-from src.models.product_model import (
-    Product,
-    AICompetitorAnalyzeMainKeywordsResult,
-    MarketingInfo
-)
-from src.models.ai_execution_record import (
-    BaseAIExecution,
-    CompetitorKeywordAnalysis,
-    MarketingContentGeneration
-)
-from .analyzer import AITaskHandler
-
-class MarketingCopyGenerator(AITaskHandler):
-    """营销文案生成器"""
-    
-    async def format_prompt(
-        self,
-        product: Product,
-        custom_template: Optional[str] = None
-    ) -> str:
-        template = custom_template or (
-            "为{product_name}生成营销文案,要求:\n"
-            "- 突出卖点:{selling_points}\n"
-            "- 符合日本市场风格\n"
-            "- 包含标题和详细描述"
-        )
-        return template.format(
-            product_name=product.basic_info.name,
-            selling_points=", ".join(product.basic_info.selling_point or [])
-        )
-
-
-    def get_format_instructions(self) -> str:
-        return f"请使用JSON格式输出,符合以下结构:\n{MarketingInfo.schema_json(indent=2)}"
-
-class CompetitorKeywordAnalyzer(AITaskHandler):
-    """竞品关键词分析器"""
-    
-    async def format_prompt(
-        self,
-        product: Product,
-        custom_template: Optional[str] = None
-    ) -> str:
-        template = custom_template or (
-            "作为亚马逊运营专家,请分析以下产品数据并推荐关键词:\n"
-            "产品名称:{product_name}\n"
-            "竞品数据:{competitor_data}\n"
-            "输出要求:{format_instructions}"
-        )
-        format_params = {
-            "product_name": product.basic_info.name,
-            "competitor_data": self._format_competitor_data(product),
-            "format_instructions": self.get_format_instructions()
-        }
-        return template.format(**format_params)
-
-    def _format_competitor_data(self, product: Product) -> str:
-        return "\n".join([
-            f"ASIN: {asin}\nMain Text: {data.extra_result.product_info.main_text}"
-            for asin, data in product.competitor_crawl_data.items()
-        ])
-
-    def get_result_model(self) -> Type[BaseModel]:
-        return CompetitorKeywordAnalysis
-
-    def get_format_instructions(self) -> str:
-        return f"请使用JSON格式输出,符合以下结构:\n{AICompetitorAnalyzeMainKeywordsResult.schema_json(indent=2)}"

+ 0 - 26
src/ai/ai_executor/factory.py

@@ -1,26 +0,0 @@
-from typing import Dict, Type
-from .analyzer import AITaskHandler
-from .analyzers import MarketingCopyGenerator, CompetitorKeywordAnalyzer
-
-class TaskHandlerFactory:
-    """AI任务处理器工厂"""
-    
-    _handler_map: Dict[str, Type[AITaskHandler]] = {
-        "MarketingCopyGenerator": MarketingCopyGenerator,
-        "CompetitorKeywordAnalysis": CompetitorKeywordAnalyzer
-    }
-
-    @classmethod
-    def create_handler(cls, task_type: str) -> AITaskHandler:
-        """创建任务处理器实例
-        Args:
-            task_type: 任务类名字符串
-        Returns:
-            任务处理器实例
-        Raises:
-            ValueError: 当任务类型不支持时
-        """
-        handler_class = cls._handler_map.get(task_type)
-        if handler_class is None:
-            raise ValueError(f"不支持的任务类: {task_type}")
-        return handler_class()

+ 0 - 99
src/ai/ai_executor/service.py

@@ -1,99 +0,0 @@
-from typing import Optional, List
-from litellm import Type, acompletion
-from beanie import init_beanie
-from pydantic import BaseModel, ValidationError
-from motor.motor_asyncio import AsyncIOMotorClient
-from config.settings import MONGO_URL, MONGO_DB_NAME
-from src.models.product_model import Product
-from models.ai_execution_record import BaseAIExecution
-from src.models.config_model import UserConfig
-from src.manager.core.db_mongo import BaseMongoManager
-from utils.logu import get_logger
-from .factory import TaskHandlerFactory
-import asyncio
-
-logger = get_logger("ai_executor")
-
-class AIExecutorService:
-    """AI执行服务"""
-    
-    def __init__(self, product: Product):
-        self.product = product
-    
-    async def execute_task(
-        self,
-        result_class: Type[BaseAIExecution],
-        model_name: str,
-        custom_prompt: Optional[str] = None
-    ) -> BaseAIExecution:
-        """
-        执行AI任务并存储结果
-        
-        参数:
-            result_class: 结果类(如CompetitorKeywordAnalysis)
-            model_name: 模型名称(如"gpt-3.5-turbo")
-            custom_prompt: 自定义提示词模板
-            
-        返回:
-            保存到MongoDB的执行结果文档
-        """
-        handler = TaskHandlerFactory.create_handler(result_class.__fields__["task_type"].default)
-        
-        try:
-            # 1. 准备提示词
-            prompt = await handler.format_prompt(self.product, custom_prompt)
-            logger.info(f"使用提示词: {prompt}")
-            
-            # 2. 调用LLM
-            response = await acompletion(
-                model=model_name,
-                messages=[{"role": "user", "content": prompt}]
-            )
-            result_data = response.choices[0].message.content
-            
-            # 3. 解析结果
-            result_model = handler.get_result_model()
-            execution_result = result_model.parse_raw(result_data)
-            
-            # 4. 创建并存储文档
-            execution_doc = result_class(
-                product=self.product,
-                model_name=model_name,
-                prompt_template=custom_prompt,
-                input_data={"prompt": prompt},
-                output_data=execution_result.dict(),
-                analysis_result=execution_result
-            )
-            await execution_doc.insert()
-            
-            return execution_doc
-            
-        except ValidationError as e:
-            logger.error(f"结果验证失败: {str(e)}")
-            raise ValueError("结果格式不正确") from e
-        except Exception as e:
-            logger.error(f"执行过程中发生错误: {str(e)}")
-            raise
-    
-    async def execute_multi_model_task(
-        self,
-        result_class: Type[BaseAIExecution],
-        model_names: List[str],
-        custom_prompt: Optional[str] = None
-    ) -> List[BaseAIExecution]:
-        """
-        使用多个模型并行执行任务
-        
-        参数:
-            result_class: 结果类
-            model_names: 模型名称列表
-            custom_prompt: 自定义提示词模板
-            
-        返回:
-            执行结果文档列表
-        """
-        tasks = [
-            self.execute_task(result_class, model_name, custom_prompt)
-            for model_name in model_names
-        ]
-        return await asyncio.gather(*tasks)

+ 0 - 116
src/ai/ai_executor/strategies.py

@@ -1,116 +0,0 @@
-from typing import Optional, Type
-from abc import ABC, abstractmethod
-from pydantic import BaseModel
-from beanie import Document
-from enum import Enum
-from src.models.product_model import Product, AICompetitorAnalyzeMainKeywordsResult, MarketingInfo
-from models.ai_execution_record import BaseAIExecution,CompetitorKeywordAnalysis, MarketingContentGeneration
-
-class AnalysisStrategy(ABC):
-    """分析策略抽象基类"""
-    
-    @property
-    @abstractmethod
-    def analysis_type(self):
-        """关联的分析类型"""
-        pass
-    
-    @property
-    @abstractmethod
-    def default_template(self) -> str:
-        """默认提示词模板"""
-        pass
-    
-    @abstractmethod
-    async def format_prompt(self, product: Product, custom_template: Optional[str] = None) -> str:
-        """格式化提示词"""
-        pass
-    
-    @abstractmethod
-    def get_model_class(self) -> Type[BaseAnalysis]:
-        """获取对应的文档模型类"""
-        pass
-    
-    @abstractmethod
-    def get_format_instructions(self) -> str:
-        """获取格式指令"""
-        pass
-
-class AnalysisType(Enum):
-    """分析类型枚举,集成策略实现"""
-    
-    @property
-    def strategy(self):
-        """获取对应的策略实例"""
-        return self.value
-    
-    class MARKETING(AnalysisStrategy):
-        """营销分析策略"""
-        
-        @property
-        def analysis_type(self):
-            return AnalysisType.MARKETING
-        
-        @property
-        def default_template(self) -> str:
-            return (
-                "为{product_name}生成营销文案,要求:\n"
-                "- 突出卖点:{selling_points}\n"
-                "- 符合日本市场风格\n"
-                "- 包含标题和详细描述"
-            )
-        
-        async def format_prompt(self, product: Product, custom_template: Optional[str] = None) -> str:
-            template = custom_template or self.default_template
-            return template.format(
-                product_name=product.basic_info.name,
-                selling_points=", ".join(product.basic_info.selling_point or [])
-            )
-        
-        def get_model_class(self) -> Type[BaseAnalysis]:
-            return AIAnalysisMarketing
-        
-        def get_format_instructions(self) -> str:
-            return f"请使用JSON格式输出,符合以下结构:\n{MarketingInfo.schema_json(indent=2)}"
-    
-    class COMPETITOR(AnalysisStrategy):
-        """竞品分析策略"""
-        
-        @property
-        def analysis_type(self):
-            return AnalysisType.COMPETITOR
-        
-        @property
-        def default_template(self) -> str:
-            return (
-                "作为亚马逊运营专家,请分析以下产品数据并推荐关键词:\n"
-                "产品名称:{product_name}\n"
-                "竞品数据:{competitor_data}\n"
-                "输出要求:{format_instructions}"
-            )
-        
-        async def format_prompt(self, product: Product, custom_template: Optional[str] = None) -> str:
-            template = custom_template or self.default_template
-            format_params = {
-                "product_name": product.basic_info.name,
-                "competitor_data": self._format_competitor_data(product),
-                "format_instructions": self.get_format_instructions()
-            }
-            return template.format(**format_params)
-        
-        def _format_competitor_data(self, product: Product) -> str:
-            return "\n".join([
-                f"ASIN: {asin}\nMain Text: {data.extra_result.product_info.main_text}"
-                for asin, data in product.competitor_crawl_data.items()
-            ])
-        
-        def get_model_class(self) -> Type[BaseAnalysis]:
-            return AIAnalysisCompetitor
-        
-        def get_format_instructions(self) -> str:
-            return f"请使用JSON格式输出,符合以下结构:\n{AICompetitorAnalyzeMainKeywordsResult.schema_json(indent=2)}"
-
-# 更新枚举值
-AnalysisType.MARKETING = AnalysisType.MARKETING()
-AnalysisType.COMPETITOR = AnalysisType.COMPETITOR()
-AnalysisType.SALES = None  # 预留扩展

+ 12 - 14
src/ai/run_service.py

@@ -1,46 +1,44 @@
 from src.models.product_model import Product
 from src.models.config_model import UserConfig
 from src.models.ai_execution_record import (
-    CompetitorKeywordAnalysis, MarketingContentGeneration,LLMConfig,
+    CompetitorKeywordAnalysis, MarketingContentGeneration, LLMConfig,
     AICompetitorAnalyzeMainKeywordsResult,
     MarketingInfo,)
 from src.manager.core.db_mongo import BaseMongoManager
+from src.ai.ai_executor import TaskRunner
 import asyncio
 from utils.logu import get_logger
-from src.models.field_config import FieldConfig,get_field_descriptions
+from src.models.field_config import FieldConfig, get_field_descriptions
 from dotenv import load_dotenv
 load_dotenv()
 logger = get_logger('ai')
 
 async def example_usage():
-    """示例调用代码"""
+    """更新后的示例调用代码"""
     db_manager = BaseMongoManager()
     await db_manager.initialize()
     
     # 获取产品及用户配置
     product = await Product.find_one(Product.basic_info.name == "电线保护套")
-    user = await UserConfig.find_one(UserConfig.user_name == "test_user")
     llm_config = LLMConfig(model_name="openai/deepseek-chat")
+    
+    # 竞品分析任务
     competitor_model = CompetitorKeywordAnalysis(
         product=product, 
         executor_config=llm_config,
         prompting=user.prompting.get("竞品和长尾词分析")
     )
-    # 使用单个模型执行任务
-    # .... 执行某些代码 ... ,从 src\ai\ai_executor 模块中调用
-    # 执行完毕后,自动被赋值结果: competitor_model.result: AICompetitorAnalyzeMainKeywordsResult
-    logger.info(f"{competitor_model.result}")
+    await task_runner.run_task(competitor_model)
+    logger.info(f"竞品分析结果: {competitor_model.result}")
     
-    marketing_prompting = user.prompting.get("营销文案")
+    # 营销文案生成任务
     marketing_model = MarketingContentGeneration(
         product=product,
         executor_config=llm_config,
-        prompting=marketing_prompting, 
+        prompting=user.prompting.get("营销文案"), 
     )
-    # 使用单个模型执行任务
-    # .... 执行某些代码 ... ,从 src\ai\ai_executor 模块中调用
-    # marketing_model.result: MarketingInfo
-    logger.info(f"{marketing_model.result}")
+    await task_runner.run_task(marketing_model)
+    logger.info(f"营销文案结果: {marketing_model.result}")
 
 if __name__ == "__main__":
     asyncio.run(example_usage())

+ 3 - 1
src/manager/core/db_mongo.py

@@ -9,7 +9,8 @@ from config.settings import MONGO_URL, MONGO_DB_NAME
 from src.models.product_model import Product
 from beanie.operators import Set, Rename
 from src.models.config_model import UserConfig, AIPromptConfig
-from models.ai_execution_record import BaseAIExecution, MarketingContentGeneration
+from src.models.ai_execution_record import BaseAIExecution, MarketingContentGeneration
+from src.models.template_model import Template
 class BaseMongoManager:
     _instance = None
     _init_lock = asyncio.Lock()
@@ -43,6 +44,7 @@ class BaseMongoManager:
                         Product,
                         UserConfig,
                         BaseAIExecution,
+                        Template,
                     ])
                 self._is_initialized = True
 

+ 146 - 0
src/manager/template_manager.py

@@ -0,0 +1,146 @@
+from typing import Optional, List, Dict, Any
+from src.models.template_model import Template, TemplateType
+from src.manager.core.db_mongo import BaseMongoManager
+from jinja2 import Environment, BaseLoader
+import json
+from bson import json_util
+from utils.logu import get_logger
+import asyncio
+
+logger = get_logger('template')
+
+class TemplateManager(BaseMongoManager):
+    def __init__(self):
+        super().__init__()
+        self.env = Environment(loader=BaseLoader())
+        self.env.filters['tojson'] = lambda v: json.dumps(v, default=json_util.default)
+
+    async def initialize(self):
+        await super().initialize()
+        
+    async def create_template(self, name: str, template_str: str,
+                            template_type: TemplateType,
+                            description: str = None) -> Template:
+        """创建新模板"""
+        template = Template(
+            name=name,
+            template_str=template_str,
+            template_type=template_type,
+            description=description
+        )
+        await template.insert()
+        return template
+
+    async def create_or_update_template(self, name: str, template_str: str,
+                                     template_type: TemplateType,
+                                     description: str = None,
+                                     if_exists: str = "update") -> Template:
+        """
+        创建模板,如果已存在则根据if_exists参数处理
+        Args:
+            name: 模板名称
+            template_str: 模板字符串
+            template_type: 模板类型
+            description: 模板描述
+            if_exists: 存在时的处理方式 ('update'或'ignore',默认为'update')
+        Returns:
+            创建或更新后的模板对象
+        Raises:
+            ValueError: 如果if_exists参数值无效
+        """
+        existing = await self.get_template(name)
+        if existing:
+            if if_exists == "ignore":
+                return existing
+            elif if_exists == "update":
+                return await self.update_template(
+                    name=name,
+                    new_template_str=template_str,
+                    new_description=description
+                )
+            else:
+                raise ValueError(f"Invalid if_exists value: {if_exists}. Must be 'update' or 'ignore'")
+        return await self.create_template(
+            name=name,
+            template_str=template_str,
+            template_type=template_type,
+            description=description
+        )
+
+    async def get_template(self, name: str) -> Optional[Template]:
+        """根据名称获取模板"""
+        await self._ensure_initialized()
+        return await Template.find_one(Template.name == name)
+
+    async def update_template(self, name: str, new_template_str: str = None,
+                            new_description: str = None) -> Optional[Template]:
+        """更新模板"""
+        template = await self.get_template(name)
+        if not template:
+            return None
+            
+        if new_template_str:
+            template.template_str = new_template_str
+        if new_description:
+            template.description = new_description
+            
+        await template.update_timestamp()
+        return template
+
+    async def delete_template(self, name: str) -> bool:
+        """删除模板"""
+        template = await self.get_template(name)
+        if template:
+            await template.delete()
+            return True
+        return False
+
+    async def list_templates(self, template_type: TemplateType = None) -> List[Template]:
+        """列出所有模板,可过滤类型"""
+        await self._ensure_initialized()
+        if template_type:
+            return await Template.find(Template.template_type == template_type).to_list()
+        return await Template.all().to_list()
+
+    def render_template(self, template_str: str, context: Dict[str, Any]):
+        """渲染模板字符串"""
+        template = self.env.from_string(template_str)
+        rendered = template.render(**context)
+        
+        try:
+            return json.loads(rendered, object_hook=json_util.object_hook)
+        except json.JSONDecodeError as e:
+            logger.error(f"Template rendering failed: {e}\nRendered content:\n{rendered}")
+            raise ValueError(f"Invalid JSON after rendering: {e}")
+
+    async def execute_template(self, name: str, context: Dict[str, Any], 
+                             collection_name: str = None):
+        """执行模板查询"""
+        template = await self.get_template(name)
+        if not template:
+            raise ValueError(f"Template '{name}' not found")
+            
+        pipeline = self.render_template(template.template_str, context)
+        
+        if not collection_name:
+            if template.template_type == TemplateType.AGGREGATION:
+                collection_name = "Product"  # 默认集合
+            
+        if not collection_name:
+            raise ValueError("Collection name is required for this template type")
+            
+        return await self.db[collection_name].aggregate(pipeline).to_list()
+
+class TemplateService:
+    """供外部程序使用的服务类"""
+    _instance = None
+    _lock = asyncio.Lock()
+    
+    @classmethod
+    async def get_instance(cls):
+        if cls._instance is None:
+            async with cls._lock:
+                if cls._instance is None:
+                    cls._instance = TemplateManager()
+                    await cls._instance.initialize()
+        return cls._instance

+ 37 - 1
src/models/ai_execution_record.py

@@ -2,7 +2,7 @@ from datetime import datetime
 from beanie import Document, Link
 from pydantic import BaseModel, Field
 from typing import Any, List, Dict, Optional, Union
-from src.models.product_model import Product,AICompetitorAnalyzeMainKeywordsResult,MarketingInfo,AIPromptConfig
+from src.models.product_model import Product,SearchAmazoneKeyResult,AIPromptConfig
 
 class LLMConfig(BaseModel):
     model_name: str = Field(
@@ -53,6 +53,33 @@ class BaseAIExecution(Document):
         name = "AIExecutionRecords"
 
 
+class AICompetitorAnalyzeMainKeywords(BaseModel):
+
+    asin:str = Field(default=None, description="商品(竞品)编号")
+    main_key:str = Field(default=None, description="主要关键词")
+    monthly_searches: Optional[int] = Field(default=None, description="月搜索量")
+    reason:Optional[str] = Field(default=None, description="选择该主关键词原因")
+    crawl_result: Optional[SearchAmazoneKeyResult] = Field(
+        default=None,
+        description="爬取AI分析出来的主关键词" 
+    )
+    created_at:Optional[datetime] = Field(default_factory=datetime.now)
+
+class AICompetitorAnalyzeMainKeywordsResult(BaseModel):
+    class TailKey(BaseModel):
+        tail_key:str = Field(default=None, description="长尾关键词")
+        monthly_searches:Optional[int] = Field(default=None, description="月搜索量")
+        reason:Optional[str] = Field(default=None, description="选择该长尾关键词原因")
+        
+    results:Optional[List[AICompetitorAnalyzeMainKeywords]] = []
+    supplement:Optional[str] = Field(
+        default=None,
+        description="补充说明,非必填。如果你有额外的信息或建议,可以在这里添加。"
+    )
+    tail_keys: Optional[List[TailKey]] = Field(
+        default=[],
+    )
+
 class CompetitorKeywordAnalysis(BaseAIExecution):
     """竞品关键词分析结果"""
     task_type: str = "competitor_analysis"
@@ -61,6 +88,15 @@ class CompetitorKeywordAnalysis(BaseAIExecution):
         description="竞品关键词分析结果"
     )
 
+class MarketingInfo(BaseModel):
+    """营销信息"""
+    title: Optional[str] = None
+    st_search: Optional[str] = None
+    selling_point: Optional[List[str]] = None
+    product_introduction: Optional[str] = None
+
+
+
 class MarketingContentGeneration(BaseAIExecution):
     """营销内容生成结果"""
     task_type: str = "marketing_generation"

+ 0 - 71
src/models/product_model.py

@@ -8,13 +8,6 @@ from beanie import Document, Indexed, init_beanie,Link
 from motor.motor_asyncio import AsyncIOMotorClient,AsyncIOMotorDatabase
 from .config_model import UserConfig,AIPromptConfig
 
-class MarketingInfo(BaseModel):
-    """营销信息"""
-    title: Optional[str] = None
-    st_search: Optional[str] = None
-    selling_point: Optional[List[str]] = None
-    product_introduction: Optional[str] = None
-
 class ProductBaseInfo(BaseModel):
     """产品基本信息
 产品名称	电线保护套
@@ -116,79 +109,15 @@ class SearchAmazoneKeyResult(BaseModel):
     msg:Optional[str] = None
     created_at:Optional[datetime] = Field(default_factory=datetime.now)
 
-
-class AICompetitorAnalyzeMainKeywords(BaseModel):
-
-    asin:str = Field(default=None, description="商品(竞品)编号")
-    main_key:str = Field(default=None, description="主要关键词")
-    monthly_searches: Optional[int] = Field(default=None, description="月搜索量")
-    reason:Optional[str] = Field(default=None, description="选择该主关键词原因")
-    crawl_result: Optional[SearchAmazoneKeyResult] = Field(
-        default=None,
-        description="爬取AI分析出来的主关键词" 
-    )
-    created_at:Optional[datetime] = Field(default_factory=datetime.now)
-
-class AICompetitorAnalyzeMainKeywordsResult(BaseModel):
-    class TailKey(BaseModel):
-        tail_key:str = Field(default=None, description="长尾关键词")
-        monthly_searches:Optional[int] = Field(default=None, description="月搜索量")
-        reason:Optional[str] = Field(default=None, description="选择该长尾关键词原因")
-        
-    results:Optional[List[AICompetitorAnalyzeMainKeywords]] = []
-    supplement:Optional[str] = Field(
-        default=None,
-        description="补充说明,非必填。如果你有额外的信息或建议,可以在这里添加。"
-    )
-    tail_keys: Optional[List[TailKey]] = Field(
-        default=[],
-    )
-
-class AIAnalyzeCompare(BaseModel):
-    model:str = Field(default=None, description="模型名称")
-    competitor_template: Optional[AIPromptConfig] = Field(
-        default=None,
-        description="关联的竞品分析模板"
-    )
-    competitor_prompt:Optional[str] = Field(
-        default=None,
-        description="竞品分析最终提示词"
-    )
-    competitor_analyze: Optional[AICompetitorAnalyzeMainKeywordsResult] = Field(
-        default=AICompetitorAnalyzeMainKeywordsResult(),
-        description=""
-    )
-    marketing_template: Optional[AIPromptConfig] = Field(
-        default=None,
-        description="关联的营销模板"
-    )
-    marketing: Optional[MarketingInfo] = Field(
-        default=None,
-        description="营销信息,使用JSONB存储。卖点1、卖点2、产品介绍风格1、风格2。。。")
-    created_at:Optional[datetime] = Field(default_factory=datetime.now)
-    
-
 # 产品主表(核心实体)
 class Product(Document):
     basic_info: Optional[ProductBaseInfo] = Field(
         default=None,
         description="产品基本信息"
     )
-    marketing: Optional[MarketingInfo] = Field(
-        default=None, 
-        description="营销信息,使用JSONB存储。卖点1、卖点2、产品介绍风格1、风格2。。。")
-    
     competitor_crawl_data: Optional[Dict[str, CompetitorCrawlData]] = Field(
         default={},  # 明确设置默认值为 None
         description="竞品分析信息,使用JSONB存储。竞品主关键词分析、竞品长尾词分析。。。")
-    competitor_analyze: Optional[AICompetitorAnalyzeMainKeywordsResult] = Field(
-        default=AICompetitorAnalyzeMainKeywordsResult(),
-        description="" 
-    )
-    ai_analysis_compare: Optional[Dict[str, AIAnalyzeCompare]] = Field(
-        default=[],
-        description="不同大模型分析结果对比"
-    )
     # 变体,如版本、型号、颜色、套餐,各个变体对应着价格、成本等财务数据
     variants: Optional[List[Variant]] = Field(
         default=None,

+ 27 - 0
src/models/template_model.py

@@ -0,0 +1,27 @@
+from typing import Optional
+from beanie import Document
+from pydantic import BaseModel, Field
+from enum import Enum
+from datetime import datetime
+
+class TemplateType(str, Enum):
+    AGGREGATION = "aggregation"
+    QUERY = "query"
+    UPDATE = "update"
+
+class BaseTemplate(BaseModel):
+    name: str = Field(..., description="模板名称")
+    description: Optional[str] = Field(None, description="模板描述")
+    template_type: TemplateType = Field(..., description="模板类型")
+    template_str: str = Field(..., description="模板字符串")
+    created_at: datetime = Field(default_factory=datetime.now, description="创建时间")
+    updated_at: datetime = Field(default_factory=datetime.now, description="更新时间")
+
+class Template(Document, BaseTemplate):
+    class Settings:
+        name = "templates"
+        indexes = ["name", "template_type"]
+
+    async def update_timestamp(self):
+        self.updated_at = datetime.now()
+        await self.save()

+ 0 - 0
tests/ai/test_ai_executor.py


+ 1 - 1
tests/ai/test_parallel_analysis.py

@@ -1,6 +1,6 @@
 from src.ai.ai_executor.parallel_runner import ParallelAnalysisRunner
 from src.models.product_model import Product
-from models.ai_execution_record import AnalysisType, ModelProvider
+from src.models.ai_execution_record import AnalysisType, ModelProvider
 from src.models.config_model import UserConfig
 import pytest
 from src.manager.core.db_mongo import BaseMongoManager

+ 86 - 0
tests/mytest/models/t_beanie_odm_to_qurystring.py

@@ -0,0 +1,86 @@
+import json
+from src.models.product_model import Product
+from src.models.config_model import UserConfig
+from src.models.ai_execution_record import (
+    CompetitorKeywordAnalysis, MarketingContentGeneration, LLMConfig,
+    AICompetitorAnalyzeMainKeywordsResult,
+    MarketingInfo,)
+from src.manager.core.db_mongo import BaseMongoManager
+import asyncio
+from utils.logu import get_logger
+from src.models.field_config import FieldConfig, get_field_descriptions
+logger = get_logger('test')
+def get_competitor_prompt_data(
+    product: Product,
+    field_config: FieldConfig
+) -> list:
+    """
+    获取竞品提示数据
+    
+    Args:
+        product: 产品对象
+        field_config: 字段配置
+        
+    Returns:
+        结构化竞品数据列表
+    """
+    competitor_crawl_data = product.competitor_crawl_data
+    list_data = []
+    
+    for asin, crawl_data in competitor_crawl_data.items():
+        if crawl_data.extra_result:
+            structured_result = {"asin": asin}
+            
+            if crawl_data.extra_result.product_info:
+                structured_result["product_info"] = field_config.filter_model_dump(
+                    crawl_data.extra_result.product_info,
+                    "ProductImageInfo"
+                )
+                
+            if crawl_data.extra_result.result_table:
+                structured_result["result_table"] = [
+                    field_config.filter_model_dump(item, "TrafficKeywordResult")
+                    for item in crawl_data.extra_result.result_table
+                ]
+                
+            logger.debug(f"Structured result for LLM: {json.dumps(structured_result, indent=4, ensure_ascii=False)}")
+            list_data.append(structured_result)
+            
+    return list_data
+async def example_usage():
+    """更新后的示例调用代码"""
+    db_manager = BaseMongoManager()
+    await db_manager.initialize()
+    db = db_manager.db
+    # 获取产品及用户配置
+    product = await Product.find_one(Product.basic_info.name == "电线保护套")
+    # logger.info(f"{product.basic_info}")
+    # logger.info(f"{product.competitor_crawl_data}")
+    competitor_crawl_data = get_competitor_prompt_data(product, FieldConfig(include_fields={
+            "ProductImageInfo": {"main_text"},
+            "TrafficKeywordResult": {"traffic_keyword", "monthly_searches"},
+            "ProductBaseInfo": {
+                "name", "content", "material", "color", "size",
+                "packaging_size", "weight", "main_usage", "selling_point"
+            },
+            "CompetitorCrawlData": {"asin"}
+        }))
+    logger.info(f"{json.dumps(competitor_crawl_data, indent=4, ensure_ascii=False)}")
+    # 必须添加 await 执行查询
+    # res = await Product.find_one(
+    #     { "basic_info.name": "电线保护套" },
+    #     { "basic_info": 1, "_id": 0 }
+    # )
+    # logger.info(f"{res}")  # 现在会打印文档内容
+#     collection = db['Product']
+#     result = await collection.find_one(
+#     { "basic_info.name": "电线保护套" },
+#     { "basic_info": 1, "_id": 0 }  # 投影
+# )
+    # print(result)
+
+def main():
+    asyncio.run(example_usage())
+
+if __name__ == "__main__":
+    main()

+ 155 - 0
tests/mytest/models/t_mongo_filter.py

@@ -0,0 +1,155 @@
+from typing import Dict, Any
+from pymongo import MongoClient
+from pydantic import BaseModel
+from config.settings import MONGO_URL, MONGO_DB_NAME
+
+class MongoFilterConfig(BaseModel):
+    """MongoDB 查询配置模型"""
+    name: str
+    description: str
+    collection: str
+    query: Dict[str, Any]
+    projection: Dict[str, Any]
+
+class MongoFilter:
+    """MongoDB 字段筛选器"""
+    
+    def __init__(self, mongo_uri: str, db_name: str):
+        """
+        初始化 MongoDB 连接
+        
+        Args:
+            mongo_uri: MongoDB 连接字符串
+            db_name: 数据库名称
+        """
+        self.client = MongoClient(mongo_uri)
+        self.db = self.client[db_name]
+        self.config_collection = self.db["query_configs"]
+        
+    async def save_query_config(self, config: MongoFilterConfig):
+        """
+        保存查询配置到 MongoDB
+        
+        Args:
+            config: 查询配置对象
+        """
+        self.config_collection.update_one(
+            {"name": config.name},
+            {"$set": config.dict()},
+            upsert=True
+        )
+    
+    async def get_product_basic_info(self, product_id: str) -> Dict[str, Any]:
+        """
+        获取产品基础信息
+        
+        Args:
+            product_id: 产品ID
+            
+        Returns:
+            产品基础信息字典
+        """
+        config = await self.get_query_config("product_basic_info")
+        result = self.db[config.collection].find_one(
+            {"_id": product_id},
+            config.projection
+        )
+        return result
+    
+    async def get_competitor_data(self, product_id: str) -> Dict[str, Any]:
+        """
+        获取竞品数据
+        
+        Args:
+            product_id: 产品ID
+            
+        Returns:
+            竞品数据字典
+        """
+        config = await self.get_query_config("competitor_data")
+        result = self.db[config.collection].find_one(
+            {"_id": product_id},
+            config.projection
+        )
+        return result
+    
+    async def get_query_config(self, name: str) -> MongoFilterConfig:
+        """
+        从配置获取查询配置
+        
+        Args:
+            name: 配置名称
+            
+        Returns:
+            查询配置对象
+        """
+        config_data = self.config_collection.find_one({"name": name})
+        if not config_data:
+            # 如果配置不存在,创建默认配置
+            if name == "product_basic_info":
+                config_data = {
+                    "name": "product_basic_info",
+                    "description": "产品基础信息查询",
+                    "collection": "products",
+                    "query": {},
+                    "projection": {
+                        "basic_info": 1,
+                        "_id": 0
+                    }
+                }
+            elif name == "competitor_data":
+                config_data = {
+                    "name": "competitor_data",
+                    "description": "竞品数据查询",
+                    "collection": "products",
+                    "query": {},
+                    "projection": {
+                        "competitor_crawl_data": 1,
+                        "_id": 0
+                    }
+                }
+            self.config_collection.insert_one(config_data)
+        
+        return MongoFilterConfig(**config_data)
+
+# 示例用法
+if __name__ == "__main__":
+    import asyncio
+    
+    async def demo():
+        filter = MongoFilter(MONGO_URL, MONGO_DB_NAME)
+        
+        # 保存查询配置
+        basic_info_config = MongoFilterConfig(
+            name="product_basic_info",
+            description="产品基础信息查询",
+            collection="products",
+            query={},
+            projection={
+                "basic_info": 1,
+                "_id": 0
+            }
+        )
+        await filter.save_query_config(basic_info_config)
+        
+        competitor_config = MongoFilterConfig(
+            name="competitor_data",
+            description="竞品数据查询",
+            collection="products",
+            query={},
+            projection={
+                "competitor_crawl_data": 1,
+                "_id": 0
+            }
+        )
+        await filter.save_query_config(competitor_config)
+        
+        # 使用配置查询
+        product_id = "67df296f7e9e7b40ae98fbf8"
+        basic_info = await filter.get_product_basic_info(product_id)
+        print("产品基础信息:", basic_info)
+        
+        competitor_data = await filter.get_competitor_data(product_id)
+        print("竞品数据:", competitor_data)
+    
+    asyncio.run(demo())

+ 95 - 0
tests/mytest/models/t_mongo_template_service.py

@@ -0,0 +1,95 @@
+import asyncio
+from src.models.template_model import Template, TemplateType
+from src.manager.template_manager import TemplateManager, TemplateService
+from src.manager.core.db_mongo import BaseMongoManager
+from bson import json_util
+import json
+from utils.logu import get_logger
+
+logger = get_logger('test', console_level='DEBUG')
+
+import asyncio
+import aiofiles
+import os
+import sys
+product_info_pipeline = [
+    {
+        '$match': {
+            'basic_info.name': '{{product_name}}'
+        } 
+    },
+    {
+        '$project': {
+            'basic_info': 1, 
+            '_id': 0
+        }
+    }
+]
+filter_competior_by_name = [
+    {
+        '$match': {
+            'basic_info.name': "{{ product_name }}"
+        }  
+    },
+    {
+        '$project': {
+            'competitor_crawl_data': 1
+        }
+    }, {
+        '$addFields': {
+            'competitors': {
+                '$objectToArray': '$competitor_crawl_data'
+            }
+        }
+    }, {
+        '$unwind': '$competitors'
+    }, {
+        '$project': {
+            '_id': 0, 
+            'asin': '$competitors.k', 
+            'product_info': {
+                'main_text': '$competitors.v.extra_result.product_info.main_text'
+            }, 
+            'result_table': {
+                '$map': {
+                    'input': '$competitors.v.extra_result.result_table', 
+                    'as': 'item', 
+                    'in': {
+                        'traffic_keyword': '$$item.traffic_keyword', 
+                        'monthly_searches': '$$item.monthly_searches'
+                    }
+                }
+            }
+        }
+    }
+]
+async def create_or_update_template():
+    manager = TemplateManager()
+    await manager.initialize()
+    template = await manager.create_or_update_template(
+        name="product_info",
+        template_str=json.dumps(product_info_pipeline),
+        template_type=TemplateType.AGGREGATION,
+        description="产品信息查询模板"
+    )
+    await manager.create_or_update_template(
+        name="competitor_for_llm",
+        template_str=json.dumps(filter_competior_by_name),
+        template_type=TemplateType.AGGREGATION,
+        description="竞品数据查询模板,筛选出主要信息"
+    )
+    logger.info(f"Created template: {template}")
+
+async def task():
+    manager = TemplateManager()
+    await manager.initialize()
+    template = await manager.get_template("product_info")
+    logger.info(f"product_info template: {template}")
+    template = await manager.get_template("competitor_for_llm")
+    logger.info(f"competitor_for_llm template: {template}")
+    
+def main():
+    asyncio.run(task())
+
+if __name__ == "__main__":
+    main()

+ 138 - 0
tests/mytest/models/t_pymongo.py

@@ -0,0 +1,138 @@
+import json
+from src.models.product_model import Product
+from src.models.config_model import UserConfig
+from src.models.ai_execution_record import (
+    CompetitorKeywordAnalysis, MarketingContentGeneration, LLMConfig,
+    AICompetitorAnalyzeMainKeywordsResult,
+    MarketingInfo,)
+from src.manager.core.db_mongo import BaseMongoManager
+import asyncio
+from utils.logu import get_logger
+from src.models.field_config import FieldConfig, get_field_descriptions
+logger = get_logger('test')
+
+async def example_usage():
+    """更新后的示例调用代码"""
+    db_manager = BaseMongoManager(db_name='test')
+    await db_manager.initialize()
+    db = db_manager.db
+    # 使用 beanie 语法
+    product = await Product.find_one(Product.basic_info.name == "电线保护套")
+    # 使用 pymongo 语法
+    collection = db['Product']
+    result = await collection.find_one(
+    { "basic_info.name": "电线保护套" },
+    { "basic_info": 1, "_id": 0 }  # 投影
+)
+    print(result)
+    result = await db.Product.find_one(
+    { "basic_info.name": "电线保护套" },
+    { "basic_info": 1, "_id": 0 }
+    )
+    print(result)
+async def query_demo():
+    """pymongo 示例调用代码"""
+    db_manager = BaseMongoManager(db_name='test')
+    await db_manager.initialize()
+    db = db_manager.db
+    query_find_one = {
+    "filter": {"basic_info.name": "电线保护套"},
+    "projection": {"basic_info": 1, "_id": 0}
+}   
+    result = await db.Product.find_one(
+       **query_find_one 
+    )
+    print(result)
+
+async def aggregate_demo():
+    db_manager = BaseMongoManager(db_name='test')
+    await db_manager.initialize()
+    db = db_manager.db
+    pipeline = [
+    {
+        '$project': {
+            'competitor_crawl_data': 1
+        }
+    }, {
+        '$addFields': {
+            'competitors': {
+                '$objectToArray': '$competitor_crawl_data'
+            }
+        }
+    }, {
+        '$unwind': '$competitors'
+    }, {
+        '$project': {
+            '_id': 0, 
+            'asin': '$competitors.k', 
+            'product_info': {
+                'main_text': '$competitors.v.extra_result.product_info.main_text'
+            }, 
+            'result_table': {
+                '$map': {
+                    'input': '$competitors.v.extra_result.result_table', 
+                    'as': 'item', 
+                    'in': {
+                        'traffic_keyword': '$$item.traffic_keyword', 
+                        'monthly_searches': '$$item.monthly_searches'
+                    }
+                }
+            }
+        }
+    }
+]
+    
+    result = await db.Product.aggregate(pipeline).to_list()
+    print(json.dumps(result, indent=4, ensure_ascii=False))
+
+
+async def filter_aggregate_demo():
+    db_manager = BaseMongoManager(db_name='test')
+    await db_manager.initialize()
+    db = db_manager.db
+    filter_competior_by_name = [
+    {
+        '$match': {
+            'basic_info.name': "{{ product_name }}"
+        }  
+    },
+    {
+        '$project': {
+            'competitor_crawl_data': 1
+        }
+    }, {
+        '$addFields': {
+            'competitors': {
+                '$objectToArray': '$competitor_crawl_data'
+            }
+        }
+    }, {
+        '$unwind': '$competitors'
+    }, {
+        '$project': {
+            '_id': 0, 
+            'asin': '$competitors.k', 
+            'product_info': {
+                'main_text': '$competitors.v.extra_result.product_info.main_text'
+            }, 
+            'result_table': {
+                '$map': {
+                    'input': '$competitors.v.extra_result.result_table', 
+                    'as': 'item', 
+                    'in': {
+                        'traffic_keyword': '$$item.traffic_keyword', 
+                        'monthly_searches': '$$item.monthly_searches'
+                    }
+                }
+            }
+        }
+    }
+]
+    result = await db.Product.aggregate(filter_competior_by_name).to_list()
+    print(json.dumps(result, indent=4, ensure_ascii=False))
+
+def main():
+    asyncio.run(filter_aggregate_demo())
+
+if __name__ == "__main__":
+    main()

+ 101 - 0
tests/mytest/models/t_pymongo_template.py

@@ -0,0 +1,101 @@
+from jinja2 import Environment, BaseLoader
+import json
+from bson import json_util
+from src.manager.core.db_mongo import BaseMongoManager
+import asyncio
+from utils.logu import get_logger
+from src.models.field_config import FieldConfig, get_field_descriptions
+
+logger = get_logger('test')
+
+class MongoAggregationTemplate:
+    def __init__(self):
+        self.env = Environment(loader=BaseLoader())
+        # 添加自定义过滤器
+        self.env.filters['tojson'] = lambda v: json.dumps(v, default=json_util.default)
+    
+    def to_template_string(self, pipeline):
+        """将聚合管道转换为模板字符串"""
+        return json.dumps(pipeline, default=json_util.default)
+    
+    def render(self, template_str, context):
+        """渲染模板字符串为可执行的聚合管道"""
+        template = self.env.from_string(template_str)
+        rendered = template.render(**context)
+        
+        try:
+            return json.loads(rendered, object_hook=json_util.object_hook)
+        except json.JSONDecodeError as e:
+            raise ValueError(f"Invalid JSON after rendering: {e}\nRendered content:\n{rendered}")
+
+async def filter_aggregate_demo(product_name="电线保护套"):
+    # 初始化模板处理器
+    template_processor = MongoAggregationTemplate()
+    
+    # 定义聚合管道模板
+    filter_competior_by_name = [
+        {
+            '$match': {
+                'basic_info.name': "{{ product_name }}"
+            }  
+        },
+        {
+            '$project': {
+                'competitor_crawl_data': 1
+            }
+        }, {
+            '$addFields': {
+                'competitors': {
+                    '$objectToArray': '$competitor_crawl_data'
+                }
+            }
+        }, {
+            '$unwind': '$competitors'
+        }, {
+            '$project': {
+                '_id': 0, 
+                'asin': '$competitors.k', 
+                'product_info': {
+                    'main_text': '$competitors.v.extra_result.product_info.main_text'
+                }, 
+                'result_table': {
+                    '$map': {
+                        'input': '$competitors.v.extra_result.result_table', 
+                        'as': 'item', 
+                        'in': {
+                            'traffic_keyword': '$$item.traffic_keyword', 
+                            'monthly_searches': '$$item.monthly_searches'
+                        }
+                    }
+                }
+            }
+        }
+    ]
+    
+    # 将聚合管道转换为模板字符串
+    template_str = template_processor.to_template_string(filter_competior_by_name)
+    logger.info(f"Template string: {template_str}")
+    
+    # 初始化数据库
+    db_manager = BaseMongoManager(db_name='test')
+    await db_manager.initialize()
+    db = db_manager.db
+    
+    # 渲染模板
+    try:
+        pipeline = template_processor.render(template_str, {
+            "product_name": product_name,
+            # 可以添加更多变量
+        })
+        logger.info(f"Rendered pipeline: {json.dumps(pipeline, indent=2, ensure_ascii=False)}")
+    except ValueError as e:
+        logger.error(f"Template rendering failed: {e}")
+        return
+    
+    # 执行聚合查询
+    result = await db.Product.aggregate(pipeline).to_list()
+    logger.info(json.dumps(result, ensure_ascii=False))
+
+# 测试代码
+if __name__ == "__main__":
+    asyncio.run(filter_aggregate_demo())

+ 127 - 0
tests/test_template.py

@@ -0,0 +1,127 @@
+import pytest
+import pytest_asyncio
+import asyncio
+from src.models.template_model import Template, TemplateType
+from src.manager.template_manager import TemplateManager, TemplateService
+from src.manager.core.db_mongo import BaseMongoManager
+from bson import json_util
+import json
+from utils.logu import get_logger
+
+logger = get_logger('test', console_level='DEBUG')
+
+@pytest.fixture(scope="module")
+def event_loop():
+    """Create an instance of the default event loop for each test case."""
+    loop = asyncio.get_event_loop_policy().new_event_loop()
+    yield loop
+    loop.close()
+
+@pytest_asyncio.fixture(scope="module")
+async def template_manager(event_loop):
+    # 确保在同一个事件循环中执行
+    logger.info("Initializing template manager...")
+    manager = TemplateManager()
+    await manager.initialize()
+    try:
+        yield manager
+    finally:
+        # 清理测试数据
+        logger.info("Cleaning up test data...")
+        await Template.find().delete()
+
+@pytest.mark.asyncio
+async def test_template_crud(template_manager):
+    logger.info("Running test_template_crud...")
+    
+    # 测试创建模板
+    logger.debug("Creating template...")
+    template = await template_manager.create_template(
+        name="test_agg",
+        template_str=json.dumps([{"$match": {"name": "{{product_name}}"}}]),
+        template_type=TemplateType.AGGREGATION,
+        description="测试聚合模板"
+    )
+    logger.info(f"Created template: {template}")
+    assert template.name == "test_agg"
+    
+    # 测试获取模板
+    logger.debug("Fetching template...")
+    fetched = await template_manager.get_template("test_agg")
+    logger.info(f"Fetched template: {fetched}")
+    assert fetched is not None
+    assert fetched.description == "测试聚合模板"
+    
+    # 测试更新模板
+    logger.debug("Updating template...")
+    updated = await template_manager.update_template(
+        "test_agg",
+        new_description="更新后的描述"
+    )
+    logger.info(f"Updated template: {updated}")
+    assert updated.description == "更新后的描述"
+    
+    # 测试删除模板
+    logger.debug("Deleting template...")
+    result = await template_manager.delete_template("test_agg")
+    logger.info(f"Delete result: {result}")
+    assert result is True
+    assert await template_manager.get_template("test_agg") is None
+
+@pytest.mark.asyncio
+async def test_template_render(template_manager):
+    logger.info("Running test_template_render...")
+    
+    # 创建测试模板
+    logger.debug("Creating template for rendering...")
+    await template_manager.create_template(
+        name="render_test",
+        template_str=json.dumps([{"$match": {"name": "{{product_name}}"}}]),
+        template_type=TemplateType.AGGREGATION
+    )
+    
+    # 测试渲染
+    logger.debug("Rendering template...")
+    context = {"product_name": "电线保护套"}
+    pipeline = template_manager.render_template(
+        '[{"$match": {"name": "{{product_name}}"}}]',
+        context
+    )
+    logger.info(f"Rendered pipeline: {pipeline}")
+    assert pipeline == [{"$match": {"name": "电线保护套"}}]
+
+@pytest.mark.asyncio
+async def test_template_execute(template_manager):
+    logger.info("Running test_template_execute...")
+    
+    # 创建测试模板
+    logger.debug("Creating template for execution...")
+    await template_manager.create_template(
+        name="exec_test",
+        template_str=json.dumps([{"$match": {"basic_info.name": "{{product_name}}"}}]),
+        template_type=TemplateType.AGGREGATION
+    )
+    
+    # 测试执行(需要mock数据库或已有测试数据)
+    try:
+        logger.debug("Executing template...")
+        result = await template_manager.execute_template(
+            "exec_test",
+            {"product_name": "电线保护套"}
+        )
+        logger.info(f"Execution result: {result}")
+        assert isinstance(result, list)
+    except Exception as e:
+        logger.warning(f"Skipping test due to missing MongoDB or test data: {e}")
+        pytest.skip(f"需要有效的MongoDB连接或测试数据: {e}")
+
+@pytest.mark.asyncio
+async def test_template_service():
+    logger.info("Running test_template_service...")
+    
+    # 测试服务单例
+    logger.debug("Getting TemplateService instance...")
+    service1 = await TemplateService.get_instance()
+    service2 = await TemplateService.get_instance()
+    logger.info(f"Service1: {service1}, Service2: {service2}")
+    assert service1 is service2