Sfoglia il codice sorgente

暂存。新增了提示词用户配置信息。但是AI分析结果的字段可能要重新架构

mrh 8 mesi fa
parent
commit
ea3a0eeaf2

+ 2 - 1
README.md

@@ -19,7 +19,8 @@ restic ls -l 04c9313f
 G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongodump.exe --uri="mongodb://sv-v2:27017/amazone" --collection=Product -o G:\code\amazone\copywriting_production\output\temp
 $timestr = Get-Date -Format "yyyyMMdd_HH_mm_ss"
 echo $timestr
-G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongorestore.exe --uri="mongodb://sv-v2:27017/" --nsInclude="amazone.Product" --nsFrom="amazone.Product" --nsTo="backup_amazone.Product123" G:\code\amazone\copywriting_production\output\temp\amazone\Product.bson --drop
+# 恢复到 test 数据库
+G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongorestore.exe --uri="mongodb://sv-v2:27017/test" I:\eng\backup\mongo\20250327_01_04_08\amazone\Product.bson
 
 G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongorestore.exe --uri="mongodb://sv-v2:27017/backup_amazone" --collection=Product G:\code\amazone\copywriting_production\output\temp
 mongoimport --uri="mongodb://sv-v2:27017" --collection=users --file=G:\code\amazone\copywriting_production\output\mongodb_backup\users.json

+ 46 - 0
docs/gpt/agent_product.md

@@ -1,3 +1,49 @@
+# AI prompt 模板化配置
+@/src\models\product_model.py 
+@/src\models\config_model.py 
+@/docs\gpt\website\beanie-odm-link.md 
+
+product AIAnalyzeCompare 集合的字段 marketing_template_name competitor_template_name 我觉得改为直接引用文档更合理,是否可以通过 beanie-odm 来自动加载数据。
+
+userconfig 集合中的 AIPromptConfig 似乎要改为文档才可以被外部引用对吗?
+同时要注意 AIPromptConfig 是归属于 哪个 User
+
+请使用标准的 beanie-odm 编程,参考文档:
+beanie-odm-link.md 
+
+新的集合和文档结构理应在一个新文件中。
+# AI prompt 模板化
+@/src\models\product_model.py 
+@/src\ai\agent_product.py 
+@/docs\gpt\llmaindex_prompt.md 
+@/src\manager\manager_task.py 
+
+我在大模型分析运营数据的过程中,想对比多个版本的 AI 分析结果,因此在 MongoDB Product 文档中有些字段需要对比多个不同的分析结果。通常 product.basic_info 、product.competitor_crawl_data variants 这些字段的数据是固定的,但是 marketing 、 competitor_analyze 是由AI分析的,我可能要使用不同的大模型,或者使用不同的 prompt 提示词以对比各个情况下最佳的AI分析结果。
+例如基于不同的大模型 model 得出的分析结果:
+deepseek-r1:
+- marketing
+- competitor_analyze
+
+doubao
+- marketing
+...
+基于不同提示词得出结果
+```
+product.competitor_analyze = get_from_deepseek_r1()
+model = "deepseek-r1"
+prompt = f"我是亚马逊运营,。。。。 数据  {product.basic_info}  {product.competitor_analyze} ..."
+```
+
+```
+product.competitor_analyze = get_from_doubao_competitor_analyze()
+model = "doubao"
+prompt = f"我是亚马逊运营,。。。。 数据  {product.basic_info}  {product.competitor_analyze} ..."
+product.marketing = ai_analyze_marketing(model, prompt)
+```
+
+在 agent 中新增方法用于从 _prepare_prompt_template 中格式化。必须要参考 llmaindex 的模板方法
+
+
 # 关键词生成
 官方定义推荐关键词:
 保持简洁:虽然 200 个字符是允许的最大字符数,但我们建议您使用 80 个或更少字符,因为手机屏幕会缩短较长的商品名称避免冗余:请勿在商品名称中包含冗余信息、不必要的同义词或过多的关键词优化词序:仅包含有助于买家快速识别和了解商品的信息,将词语排序以优先展示最重要的商品信息。如果适用,您可以考虑以下顺序:品牌名称-口味/款式-商品类型名称-关键属性(即商品的唯一销售主张)-颜色-尺寸/包装数量-型号。

+ 226 - 0
docs/gpt/website/beanie-odm-link.md

@@ -0,0 +1,226 @@
+Relations
+The document can contain links to other documents in their fields.
+
+Only top-level fields are fully supported for now.
+
+The following field types are supported:
+
+Link[...]
+Optional[Link[...]]
+List[Link[...]]
+Optional[List[Link[...]]]
+Also, backward links are supported:
+
+BackLink[...]
+Optional[BackLink[...]]
+List[BackLink[...]]
+Optional[List[BackLink[...]]]
+Direct link to the document:
+
+from beanie import Document, Link
+
+
+class Door(Document):
+    height: int = 2
+    width: int = 1
+
+
+class House(Document):
+    name: str
+    door: Link[Door]
+Optional direct link to the document:
+
+from typing import Optional
+
+from beanie import Document, Link
+
+
+class Door(Document):
+    height: int = 2
+    width: int = 1
+
+
+class House(Document):
+    name: str
+    door: Optional[Link[Door]]
+List of the links:
+
+from typing import List
+
+from beanie import Document, Link
+
+
+class Window(Document):
+    x: int = 10
+    y: int = 10
+
+
+class House(Document):
+    name: str
+    door: Link[Door]
+    windows: List[Link[Window]]
+Optional list of the links:
+
+from typing import List, Optional
+
+from beanie import Document, Link
+
+class Window(Document):
+    x: int = 10
+    y: int = 10
+
+class Yard(Document):
+    v: int = 10
+    y: int = 10
+
+class House(Document):
+    name: str
+    door: Link[Door]
+    windows: List[Link[Window]]
+    yards: Optional[List[Link[Yard]]]
+Other link patterns are not supported at this moment. If you need something more specific for your use-case, please open an issue on the GitHub page - https://github.com/roman-right/beanie
+
+Write
+The following write methods support relations:
+
+insert(...)
+replace(...)
+save(...)
+To apply a write method to the linked documents, you should pass the respective link_rule argument
+
+house.windows = [Window(x=100, y=100)]
+house.name = "NEW NAME"
+
+# The next call will insert a new window object and replace the house instance with updated data
+await house.save(link_rule=WriteRules.WRITE)
+
+# `insert` and `replace` methods will work the same way
+Otherwise, Beanie can ignore internal links with the link_rule parameter WriteRules.DO_NOTHING
+
+house.door.height = 3
+house.name = "NEW NAME"
+
+# The next call will just replace the house instance with new data, but the linked door object will not be synced
+await house.replace(link_rule=WriteRules.DO_NOTHING)
+
+# `insert` and `save` methods will work the same way
+Fetch
+Prefetch
+You can fetch linked documents on the find query step using the fetch_links parameter
+
+houses = await House.find(
+    House.name == "test", 
+    fetch_links=True
+).to_list()
+Supported find methods: - find - find_one - get
+Beanie uses the single aggregation query under the hood to fetch all the linked documents. This operation is very effective.
+
+If a direct link is referred to a non-existent document, after fetching it will remain the object of the Link class.
+
+Fetching will ignore non-existent documents for the list of links fields.
+
+Search by linked documents fields
+If the fetch_links parameter is set to True, search by linked documents fields is available.
+
+By field of the direct link:
+
+houses = await House.find(
+    House.door.height == 2,
+    fetch_links=True
+).to_list()
+By list of links:
+
+houses = await House.find(
+    House.windows.x > 10,
+    fetch_links=True
+).to_list()
+Search by id of the linked documents works using the following syntax:
+
+houses = await House.find(
+    House.door.id == PydanticObjectId("DOOR_ID_HERE")
+).to_list()
+It works the same way with fetch_links equal to True and False and for find_many and find_one methods.
+
+Nested links
+With Beanie you can set up nested links. Document can even link to itself. This can lead to infinite recursion. To prevent this, or to decrease the database load, you can limit the nesting depth during find operations.
+
+from beanie import Document, Link
+from typing import Optional
+
+class SelfLinkedSample(Document):
+    name: str
+    left: Optional[Link["SelfLinkedSample"]]
+    right: Optional[Link["SelfLinkedSample"]]
+You can set up depth for all linked documents independently of the field:
+
+await SelfLinkedSample.find(
+    SelfLinkedSample.name == "test",
+    fetch_links=True,
+    nesting_depth=2
+).to_list()
+Or you can set up depth for a specific field:
+
+await SelfLinkedSample.find(
+    SelfLinkedSample.name == "test",
+    fetch_links=True,
+    nesting_depths_per_field={
+        "left": 1,
+        "right": 2
+    }
+).to_list()
+Also, you can set up the maximum nesting depth on the document definition level. You can read more about this here.
+
+On-demand fetch
+If you don't use prefetching, linked documents will be presented as objects of the Link class.
+
+You can fetch them manually afterwards.
+
+To fetch all the linked documents, you can use the fetch_all_links method
+
+await house.fetch_all_links()
+It will fetch all the linked documents and replace Link objects with them.
+
+Otherwise, you can fetch a single field:
+
+await house.fetch_link(House.door)
+This will fetch the Door object and put it into the door field of the house object.
+
+Delete
+Delete method works the same way as write operations, but it uses other rules.
+
+To delete all the links on the document deletion, you should use the DeleteRules.DELETE_LINKS value for the link_rule parameter:
+
+await house.delete(link_rule=DeleteRules.DELETE_LINKS)
+To keep linked documents, you can use the DO_NOTHING rule:
+
+await house.delete(link_rule=DeleteRules.DO_NOTHING)
+Back Links
+To init the back link you should have a document with the direct or list of links to the current document.
+
+from typing import List
+
+from beanie import Document, BackLink, Link
+from pydantic import Field
+
+
+class House(Document):
+    name: str
+    door: Link["Door"]
+    owners: List[Link["Person"]]
+
+
+class Door(Document):
+    height: int = 2
+    width: int = 1
+    house: BackLink[House] = Field(original_field="door")
+
+
+class Person(Document):
+    name: str
+    house: List[BackLink[House]] = Field(original_field="owners")
+The original_field parameter is required for the back link field.
+
+Back links support all the operations that normal links support, but are virtual. This means that when searching the database, you will need to include fetch_links=True (see Finding documents.), or you will recieve an empty 'BackLink' virtual object. It is not possible to fetch() this virtual link after the initial search.
+
+Limitations
+Find operations with the fetch_links parameter can not be used in the chaning with delete and update methods.

+ 122 - 0
docs/gpt/website/llmaindex_prompt.md

@@ -0,0 +1,122 @@
+https://docs.llamaindex.ai/en/stable/examples/prompts/advanced_prompts/
+1. Partial Formatting
+Partial formatting (partial_format) allows you to partially format a prompt, filling in some variables while leaving others to be filled in later.
+
+This is a nice convenience function so you don't have to maintain all the required prompt variables all the way down to format, you can partially format as they come in.
+
+This will create a copy of the prompt template.
+
+qa_prompt_tmpl_str = """\
+Context information is below.
+---------------------
+{context_str}
+---------------------
+Given the context information and not prior knowledge, answer the query.
+Please write the answer in the style of {tone_name}
+Query: {query_str}
+Answer: \
+"""
+
+prompt_tmpl = PromptTemplate(qa_prompt_tmpl_str)
+partial_prompt_tmpl = prompt_tmpl.partial_format(tone_name="Shakespeare")
+partial_prompt_tmpl.kwargs
+{'tone_name': 'Shakespeare'}
+fmt_prompt = partial_prompt_tmpl.format(
+    context_str="In this work, we develop and release Llama 2, a collection of pretrained and fine-tuned large language models (LLMs) ranging in scale from 7 billion to 70 billion parameters",
+    query_str="How many params does llama 2 have",
+)
+print(fmt_prompt)
+Context information is below.
+---------------------
+In this work, we develop and release Llama 2, a collection of pretrained and fine-tuned large language models (LLMs) ranging in scale from 7 billion to 70 billion parameters
+---------------------
+Given the context information and not prior knowledge, answer the query.
+Please write the answer in the style of Shakespeare
+Query: How many params does llama 2 have
+Answer: 
+2. Prompt Template Variable Mappings
+Template var mappings allow you to specify a mapping from the "expected" prompt keys (e.g. context_str and query_str for response synthesis), with the keys actually in your template.
+
+This allows you re-use your existing string templates without having to annoyingly change out the template variables.
+
+# NOTE: here notice we use `my_context` and `my_query` as template variables
+
+qa_prompt_tmpl_str = """\
+Context information is below.
+---------------------
+{my_context}
+---------------------
+Given the context information and not prior knowledge, answer the query.
+Query: {my_query}
+Answer: \
+"""
+
+template_var_mappings = {"context_str": "my_context", "query_str": "my_query"}
+
+prompt_tmpl = PromptTemplate(
+    qa_prompt_tmpl_str, template_var_mappings=template_var_mappings
+)
+fmt_prompt = prompt_tmpl.format(
+    context_str="In this work, we develop and release Llama 2, a collection of pretrained and fine-tuned large language models (LLMs) ranging in scale from 7 billion to 70 billion parameters",
+    query_str="How many params does llama 2 have",
+)
+print(fmt_prompt)
+Context information is below.
+---------------------
+In this work, we develop and release Llama 2, a collection of pretrained and fine-tuned large language models (LLMs) ranging in scale from 7 billion to 70 billion parameters
+---------------------
+Given the context information and not prior knowledge, answer the query.
+Query: How many params does llama 2 have
+Answer: 
+3. Prompt Function Mappings
+You can also pass in functions as template variables instead of fixed values.
+
+This allows you to dynamically inject certain values, dependent on other values, during query-time.
+
+Here are some basic examples. We show more advanced examples (e.g. few-shot examples) in our Prompt Engineering for RAG guide.
+
+qa_prompt_tmpl_str = """\
+Context information is below.
+---------------------
+{context_str}
+---------------------
+Given the context information and not prior knowledge, answer the query.
+Query: {query_str}
+Answer: \
+"""
+
+
+def format_context_fn(**kwargs):
+    # format context with bullet points
+    context_list = kwargs["context_str"].split("\n\n")
+    fmtted_context = "\n\n".join([f"- {c}" for c in context_list])
+    return fmtted_context
+
+
+prompt_tmpl = PromptTemplate(
+    qa_prompt_tmpl_str, function_mappings={"context_str": format_context_fn}
+)
+context_str = """\
+In this work, we develop and release Llama 2, a collection of pretrained and fine-tuned large language models (LLMs) ranging in scale from 7 billion to 70 billion parameters.
+
+Our fine-tuned LLMs, called Llama 2-Chat, are optimized for dialogue use cases.
+
+Our models outperform open-source chat models on most benchmarks we tested, and based on our human evaluations for helpfulness and safety, may be a suitable substitute for closed-source models.
+"""
+
+fmt_prompt = prompt_tmpl.format(
+    context_str=context_str, query_str="How many params does llama 2 have"
+)
+print(fmt_prompt)
+Context information is below.
+---------------------
+- In this work, we develop and release Llama 2, a collection of pretrained and fine-tuned large language models (LLMs) ranging in scale from 7 billion to 70 billion parameters.
+
+- Our fine-tuned LLMs, called Llama 2-Chat, are optimized for dialogue use cases.
+
+- Our models outperform open-source chat models on most benchmarks we tested, and based on our human evaluations for helpfulness and safety, may be a suitable substitute for closed-source models.
+
+---------------------
+Given the context information and not prior knowledge, answer the query.
+Query: How many params does llama 2 have
+Answer: 

+ 34 - 0
docs/gpt/数据库架构师.md

@@ -1,3 +1,37 @@
+
+
+# 多版本 AI 分析 (使用外链反而增加复杂度, MongoDB 一个文档可以很大,并不是非得外链接)
+@/src\models\product_model.py 
+@/src\ai\agent_product.py 
+@/src\manager\manager_task.py 
+
+我在大模型分析运营数据的过程中,想对比多个版本的 AI 分析结果,因此在 MongoDB Product 文档中有些字段需要对比多个不同的分析结果。通常 product.basic_info 、product.competitor_crawl_data variants 这些字段的数据是固定的,但是 marketing 、 competitor_analyze 是由AI分析的,我可能要使用不同的大模型,或者使用不同的 prompt 提示词以对比各个情况下最佳的AI分析结果。
+例如基于不同的大模型 model 得出的分析结果:
+deepseek-r1:
+- marketing
+- competitor_analyze
+
+doubao
+- marketing
+...
+基于不同提示词得出结果
+```
+product.competitor_analyze = get_from_deepseek_r1()
+model = "deepseek-r1"
+prompt = f"我是亚马逊运营,。。。。 数据  {product.basic_info}  {product.competitor_analyze} ..."
+```
+
+```
+product.competitor_analyze = get_from_doubao_competitor_analyze()
+model = "doubao"
+prompt = f"我是亚马逊运营,。。。。 数据  {product.basic_info}  {product.competitor_analyze} ..."
+product.marketing = ai_analyze_marketing(model, prompt)
+```
+
+从最佳架构来说, MongoDB 的灵活性使得我可以很好的扩展,我定义了 Product.ai_analysis_compare 字段,我希望在 manager_task 总新增一个方法来批量对比不同的模型。而且 agent_product.py  能够从模板和 keywords 中解析相应的模板字段,利用 llmaindex 本身的功能来生成 prompt 提示词。
+
+
+
 # AI 分析
 User: 
 我打算要python做一个运营软件。需要用到对象存储、数据库、或者jason数据的动态更新。我涉及的内容有表格,图片。HTML网页、商品编号,关键词,产品信息,产品文案的记录和保存还有长尾关键词,总之有些数据是在网站中获取的。有些数据是通过大模型生成的。还有一些表格是通过计算和整理出来的。还有一些markdown文档是通过计算,还有引入链接的方式生成的、还有一些图片截图的数据。这些数据都不是固定的,某些情况下客户需求改变,我必须要新增字段、图片、表格、markdown文档、docx文档、mhtml 文件等等。并且大模型生成还会用到 embedding 向量数据。

+ 171 - 12
src/ai/agent_product.py

@@ -9,10 +9,11 @@ import sys
 from dotenv import load_dotenv
 from pydantic import BaseModel
 from src.models.product_model import (
-    Product, CompetitorCrawlData, AICompetitorAnalyzeMainKeywords,
+    AIAnalyzeCompare, Product, CompetitorCrawlData, AICompetitorAnalyzeMainKeywords,
     TrafficKeywordResult, ProductImageInfo,AICompetitorAnalyzeMainKeywordsResult,
-    SearchAmazoneKeyResult, ProductBaseInfo, Variant
+    SearchAmazoneKeyResult, ProductBaseInfo, Variant,MarketingInfo,
 )
+from src.models.config_model import (UserConfig, AIPromptConfig, )
 from llama_index.llms.openai import OpenAI
 from llama_index.llms.litellm import LiteLLM
 from src.manager.core.db_mongo import BaseMongoManager
@@ -86,6 +87,48 @@ def get_competitor_prompt_data(
             
     return list_data
 
+class PromptFormatter:
+    """LLM提示词模板格式化器"""
+    def __init__(self, template: str, **kwargs):
+        self.template = template
+        self.kwargs = kwargs
+        self.partial_kwargs = {}
+        self.var_mappings = {}
+        self.function_mappings = {}
+
+    def partial_format(self, **kwargs) -> "PromptFormatter":
+        """部分格式化模板"""
+        self.partial_kwargs.update(kwargs)
+        return self
+
+    def map_variables(self, **mappings) -> "PromptFormatter":
+        """映射模板变量名"""
+        self.var_mappings.update(mappings)
+        return self
+
+    def map_functions(self, **functions) -> "PromptFormatter":
+        """映射模板处理函数"""
+        self.function_mappings.update(functions)
+        return self
+
+    def format(self, **kwargs) -> str:
+        """最终格式化提示词"""
+        # 合并所有参数
+        all_kwargs = {**self.partial_kwargs, **kwargs}
+        
+        # 应用变量名映射
+        mapped_kwargs = {}
+        for key, value in all_kwargs.items():
+            mapped_key = self.var_mappings.get(key, key)
+            mapped_kwargs[mapped_key] = value
+        
+        # 应用函数处理
+        for key, func in self.function_mappings.items():
+            if key in mapped_kwargs:
+                mapped_kwargs[key] = func(**mapped_kwargs)
+        
+        return self.template.format(**mapped_kwargs)
+
 class Formatter(ABC):
     """格式化器抽象基类"""
     def __init__(self, notes: Optional[dict] = None):
@@ -151,9 +194,10 @@ class LiteLLMService(LLMService):
     async def analyze(self, prompt: str) -> Union[dict, str]:
         llm_kwargs = {}
         if self.format_type == "json":
-            if 'deepseek-r' not in self.model:
-                llm_kwargs["additional_kwargs"] = {"response_format": {"type": "json_object"}}
-
+            # if 'deepseek-r' not in self.model:
+                # llm_kwargs["additional_kwargs"] = {"response_format": {"type": "json_object"}}
+            prompt += "\n请确保输出的是有效的JSON对象。"
+        logger.info(f"{self.model} 调用参数: {llm_kwargs}")
         llm = LiteLLM(model=self.model, **llm_kwargs)
 
         for attempt in range(self.max_retries):
@@ -169,7 +213,7 @@ class LiteLLMService(LLMService):
                     else:
                         raise ValueError(f"无法获取有效的JSON响应: {str(e)}")
             except Exception as e:
-                logger.error(f"LLM调用失败: {str(e)}")
+                logger.exception(f"LLM调用失败: {str(e)}")
                 raise
 
     def _process_response(self, response_text: str) -> Union[dict, str]:
@@ -193,12 +237,125 @@ class AnalysisService:
         self.llm_service:LiteLLMService = llm_service
         self.db_manager = db_manager
 
-    async def execute_analysis(self, product:Product, format_type: str = "json", dry_run=False) -> tuple[dict, str]:
-        prompt = await self._prepare_prompt(product, format_type)
-        logger.info(f"prompt: {prompt}")
+    async def execute_analysis(self, product:Product, format_type: str = "json", dry_run=False, template: Optional[AIPromptConfig] = None) -> tuple[dict, str]:
+        # if template:
+        #     formatter = PromptFormatter(template.template)
+        #     if template.keywords:
+        #         formatter.partial_format(**template.keywords)
+        #     prompt = formatter.format(
+        #         product=product,
+        #         format_type=format_type
+        #     )
+        # else:
+        #     prompt = await self._prepare_prompt(product, format_type)
+            
+        # logger.info(f"prompt: {prompt}")
+        # analysis_result = await self.llm_service.analyze(prompt)
+        # return analysis_result, prompt
+        pass
+
+    async def execute_marketing_analysis(self, product: Product, format_type: str = "json", template: Optional[AIPromptConfig] = None) -> tuple[MarketingInfo, str]:
+        """
+        执行营销文案分析
+        
+        Args:
+            product: 产品对象
+            format_type: 输出格式
+            template: 自定义提示模板
+            
+        Returns:
+            (分析结果, 使用的提示词)
+        """
+        if template:
+            formatter = PromptFormatter(template.template)
+            if template.keywords:
+                formatter.partial_format(**template.keywords)
+            prompt = formatter.format(
+                product=product,
+                format_type=format_type
+            )
+        else:
+            prompt = f'''我是亚马逊运营,请为产品 {product.basic_info.name} 生成营销文案。
+            
+产品信息:
+{product.basic_info.model_dump_json(indent=2)}
+
+要求:
+- 突出产品卖点: {', '.join(product.basic_info.selling_point)}
+- 适合日本市场风格
+- 包含吸引人的标题和详细描述'''
+
+        logger.info(f"营销分析提示词: {prompt}")
         analysis_result = await self.llm_service.analyze(prompt)
-        return analysis_result, prompt
+        
+        try:
+            marketing_info = MarketingInfo(**analysis_result)
+            return marketing_info, prompt
+        except Exception as e:
+            logger.error(f"营销分析结果解析失败: {str(e)}")
+            raise ValueError("营销分析结果格式不正确") from e
 
+    async def _prepare_competitor_prompt(self, product: Product, template: AIPromptConfig) -> str:
+        """使用llmaindex模板方法格式化提示词
+        
+        Args:
+            product: 产品对象
+            template: 提示模板配置
+            
+        Returns:
+            格式化后的提示词字符串
+        """
+        output_fields = get_field_descriptions(
+            AICompetitorAnalyzeMainKeywordsResult,
+            exclude=['results.crawl_result', 'results.created_at']
+            )
+        formatter = FormatterFactory.create_formatter(self.llm_service.format_type)
+        output_format = formatter.format(output_fields)
+
+        competitor_data = get_competitor_prompt_data(product)
+        basic_template =f'''各个字段说明:
+{get_field_descriptions(CompetitorCrawlData, include=['asin'])}
+{get_field_descriptions(ProductImageInfo, include=['main_text'])}
+{get_field_descriptions(TrafficKeywordResult, include=['traffic_keyword', 'monthly_searches'])}
+
+竞品数据:
+{competitor_data}
+
+我的产品信息如下:
+{product.basic_info.model_dump_json(indent=2)}
+
+返回格式:
+{output_format}
+----
+'''
+        template.template = basic_template + template.template
+        return template.template
+
+    @staticmethod
+    def convert_monthly_searches(value):
+        if value is None:
+            return None
+        if isinstance(value, str):
+            if not value.strip():
+                return None
+            return int(value.replace(',', ''))
+        return value
+    async def run_competitor_analysis(self, product: Product,
+        ai_analyze_compare_model: AIAnalyzeCompare,
+        format_type: str = 'json',):
+        prompt = await self._prepare_competitor_prompt(product, ai_analyze_compare_model.competitor_template)
+        logger.info(f"_prepare_competitor_prompt {prompt}")
+        analyze_result = await self.llm_service.analyze(prompt)
+        if 'results' in analyze_result:
+            for result in analyze_result['results']:
+                if 'monthly_searches' in result:
+                    result['monthly_searches'] = self.convert_monthly_searches(result['monthly_searches'])
+        
+        if 'tail_keys' in analyze_result:
+            for tail_key in analyze_result['tail_keys']:
+                if 'monthly_searches' in tail_key:
+                    tail_key['monthly_searches'] = self.convert_monthly_searches(tail_key['monthly_searches'])
+        return analyze_result
     async def _prepare_prompt(self, product: Product, format_type: str = "json", main_key_num: int = 3, tail_key_num:int = 12) -> str:
         competitor_data = get_competitor_prompt_data(product)
         # 从数据模型获取输出字段描述
@@ -216,9 +373,11 @@ class AnalysisService:
 
 竞品数据:
 {competitor_data}
-----
-我是日本站的亚马逊运营,我在给产品名称为 {product.basic_info.name} 选主要关键词,我的产品信息如下:
+
+我的产品信息如下:
 {product.basic_info.model_dump_json(indent=2)}
+----
+我是日本站的亚马逊运营,我在给产品名称为 {product.basic_info.name} 选主要关键词和长尾关键词。
 
 请根据以上 {len(competitor_data)} 个竞品数据,按以下规则分析:
 - 选出搜索量在1万以上的相同关键词作为主要关键词{main_key_num}个。

+ 3 - 3
src/manager/core/db_mongo.py

@@ -8,13 +8,13 @@ from motor.motor_asyncio import AsyncIOMotorClient
 from config.settings import MONGO_URL, MONGO_DB_NAME
 from src.models.product_model import Product
 from beanie.operators import Set, Rename
-
+from src.models.config_model import UserConfig, AIPromptConfig
 class BaseMongoManager:
     _instance = None
     _init_lock = asyncio.Lock()
     _is_initialized = False
 
-    def __new__(cls):
+    def __new__(cls, *args, **kwargs):
         if not cls._instance:
             cls._instance = super().__new__(cls)
             cls._instance.client = None
@@ -36,7 +36,7 @@ class BaseMongoManager:
             if not self._is_initialized:
                 if not hasattr(self, 'db') or self.db is None:
                     self.__init__()  # 确保client和db已初始化
-                await init_beanie(database=self.db, document_models=[Product])
+                await init_beanie(database=self.db, document_models=[Product, UserConfig])
                 self._is_initialized = True
 
     async def check_connection(self) -> bool:

+ 81 - 32
src/manager/manager_task.py

@@ -2,6 +2,7 @@ import asyncio
 from datetime import datetime
 import json
 from pathlib import Path
+from typing import List,Dict,Any
 from config.settings import CFG
 from src.manager.core.db import DbManager,AsinSeed
 from src.manager.core.db_mongo import BaseMongoManager
@@ -11,8 +12,12 @@ from config.celery import app
 from celery.result import AsyncResult
 from src.models.product_model import (
     Product,CompetitorCrawlData,AICompetitorAnalyzeMainKeywords, 
-    SearchAmazoneKeyResult, ProductBaseInfo, Variant,AICompetitorAnalyzeMainKeywordsResult
+    SearchAmazoneKeyResult, ProductBaseInfo, Variant,AICompetitorAnalyzeMainKeywordsResult,
+    AIAnalyzeCompare
     )
+from src.ai.agent_product import AnalysisService, LiteLLMService
+
+from src.models.config_model import (UserConfig, AIPromptConfig, )
 from src.models.field_config import FieldConfig
 from utils.logu import get_logger
 from upath import UPath
@@ -190,11 +195,9 @@ class ManagerTask:
             logger.info("开始执行竞品关键词分析...")
             
             # 初始化AI服务
-            # llm_service = LiteLLMService(model='openai/deepseek-reasoner',format_type='json')
             llm_service = LiteLLMService(model='openai/deepseek-chat',format_type='json')
             analysis_service = AnalysisService(llm_service, self.db_mongo)
             
-            
             # 执行实际分析
             analyze_result, prompt = await analysis_service.execute_analysis(product, dry_run=dry_run)
             
@@ -223,8 +226,7 @@ class ManagerTask:
             logger.info(f"提示词:{prompt}")
             logger.info(f"分析结果: {product.competitor_analyze.model_dump_json(indent=2)}")
             if dry_run:
-                # logger.info(f"{product.competitor_analyze.model_dump_json(indent=2)}")
-                return 
+                return
             # 保存到数据库
             await product.save()
             logger.info("竞品分析结果已成功保存到MongoDB")
@@ -233,41 +235,88 @@ class ManagerTask:
             logger.error(f"保存分析结果失败: {str(e)}")
             logger.exception("完整错误堆栈:")
             return None
-async def test_product_mongo():
-    manager = ManagerTask()    
-    await manager.db_mongo.initialize()
-    product = await Product.find_one(Product.basic_info.name == "电线保护套")
-    logger.info(f"{product}")
-    asin_completed = manager.db.get_asin_completed()
-    logger.info(f"{asin_completed}")
-    for asin_model in asin_completed:
-        if not product.competitor_crawl_data or asin_model.asin not in product.competitor_crawl_data:
-            logger.info(f"{asin_model.mhtml_path}")
-            mthml_data = read_file(asin_model.mhtml_path)
-            mhtml_path_name = Path(asin_model.mhtml_path).name
-            new_path = f's3://public/amazone/copywriting_production/output/asinseed/{mhtml_path_name}'
-            res = save_to_file(mthml_data, new_path)
-            logger.info(f"new path {res}")
-            continue
-            compet = CompetitorCrawlData(
-                sql_id=asin_model.id,
-                asin=asin_model.asin, 
-                asin_area=asin_model.asin_area, 
-                mhtml_path=asin_model.mhtml_path,
-                extra_result_path=asin_model.extra_result_path,
-                created_at=asin_model.created_at,)
-            product.competitor_crawl_data[asin_model.asin] = compet
+
+    async def compare_ai_analysis(
+        self,
+        product: Product,
+        model_configs: List[Dict[str, Any]],
+        overwrite: bool = False
+    ) -> Product:
+        """
+        批量执行不同模型的AI分析并保存对比结果
+        
+        Args:
+            product: 产品对象
+            model_configs: 模型配置列表,每个配置包含:
+                - model: 模型名称(如"deepseek-r1")
+                - introduce: 模型介绍
+                - competitor_template: 竞品分析模板
+                - marketing_template: 营销分析模板
+            overwrite: 是否覆盖已有分析
+        
+        Returns:
+            更新后的产品对象
+        """
+        if not overwrite and product.ai_analysis_compare:
+            logger.info(f"产品 {product.basic_info.name} 已有分析对比结果,跳过")
+            return product
+        
+        product.ai_analysis_compare = []
+        
+        for config in model_configs:
+            analysis_result = await self._run_single_analysis(product, config)
+            product.ai_analysis_compare.append(analysis_result)
+        
         await product.save()
-    return product
+        return product
+
+    async def _run_single_analysis(
+        self,
+        product: Product,
+        ai_analyze_compare_model: AIAnalyzeCompare,
+        format_type: str = 'json',
+    ) -> AIAnalyzeCompare:
+        """
+        执行单个模型的AI分析
+        
+        Args:
+            product: 产品对象
+            config: 模型配置
+        
+        Returns:
+            AIAnalyzeCompare对象
+        """
+        
+        # logger.info(f"_prepare_competitor_prompt {prompt}")
+        # analysis_result = await llm_product_analysis_service.llm_service.analyze(prompt)
+        # logger.info(f"analysis_result {analysis_result}")
+        # ai_analyze_compare_model.competitor_analyze = AICompetitorAnalyzeMainKeywordsResult(**analysis_result)
+        # ai_analyze_compare_model.competitor_prompt = prompt
+        # product.ai_analysis_compare[ai_analyze_compare_model.model](ai_analyze_compare_model)
+        # await product.save()
+        # return ai_analyze_compare_model
 async def main():
-    asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
     manager = ManagerTask()    
     await manager.db_mongo.initialize()
     product = await Product.find_one(Product.basic_info.name == "电线保护套")
+    user = await UserConfig.find_one(UserConfig.user_name == "test_user")
+    competitor_prompting = user.prompting.get("竞品和长尾词分析")
+    marketing_prompting = user.prompting.get("营销文案")
+    ai_analyze_compare = AIAnalyzeCompare(
+        model="openai/deepseek-chat",
+        competitor_template=competitor_prompting, 
+        marketing_template=marketing_prompting
+    )
+    await manager._run_single_analysis(product, ai_analyze_compare)
     # await manager.extract_competitor_analysis(product)
-    await manager.async_analyze_and_save(product, dry_run=False, over_write=True)
+    # await manager.async_analyze_and_save(product, dry_run=False, over_write=True)
     # await manager.submit_search_mainkeyword(product)
     return
+async def run_asinseed_task():
+    manager = ManagerTask()
+    product = await Product.find_one(Product.basic_info.name == "电线保护套")
+    asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
+
     for asin in product.competitor_crawl_data.keys():
         logger.info(f"{asin}")
         manager.submit_asinseed_task_and_wait(asin)

+ 36 - 0
src/models/config_model.py

@@ -0,0 +1,36 @@
+from datetime import datetime
+from typing import Dict, List, Optional, Any
+from pydantic import Field, BaseModel
+from beanie import Document, BackLink, Link
+
+class AIPromptConfig(BaseModel):
+    """AI提示配置模型(嵌入式文档)"""
+    name: str = Field(default=None, description="名称")
+    introduce: Optional[str] = Field(default=None, description="介绍")
+    template: str = Field(default=None, description="llamaindex 提示模板")
+    keywords: Optional[Dict[Any, Any]] = Field(
+        default={},
+        description="llamaindex 模板中的变量"
+    )
+
+
+
+class UserConfig(Document):
+    user_name: str = Field(default=None, description="")
+    email: Optional[str] = Field(default=None, description="")
+    prompting:Optional[Dict[str, AIPromptConfig]] = Field(
+        default={},
+        description="关联的AI提示配置"
+    )
+    created_at: datetime = Field(default_factory=datetime.now)
+    updated_at: Optional[datetime] = Field(default=None)
+    async def update_timestamp(self):
+        """更新文档时间戳"""
+        self.updated_at = datetime.now()
+        await self.save()
+    
+    async def add_prompting(self, prompting: AIPromptConfig):
+        """添加提示配置"""
+        self.prompting[prompting.name] = prompting
+        await self.save()
+        return self

+ 31 - 39
src/models/product_model.py

@@ -1,11 +1,12 @@
 import asyncio
 from datetime import datetime
-from typing import Optional, Dict, List
+from typing import Any, Optional, Dict, List
 from sqlalchemy.dialects.postgresql import JSONB
 from sqlalchemy import ForeignKeyConstraint
 from pydantic import BaseModel,Field
-from beanie import Document, Indexed, init_beanie
+from beanie import Document, Indexed, init_beanie,Link
 from motor.motor_asyncio import AsyncIOMotorClient,AsyncIOMotorDatabase
+from .config_model import UserConfig,AIPromptConfig
 
 class MarketingInfo(BaseModel):
     """营销信息"""
@@ -143,12 +144,30 @@ class AICompetitorAnalyzeMainKeywordsResult(BaseModel):
         default=[],
     )
 
-
-class CompetitorAnalyze(BaseModel):
-    ai_analyze_main_keywords: Optional[List[AICompetitorAnalyzeMainKeywords]] = Field(
-        default=[],
-        description="AI分析的主关键词" 
+class AIAnalyzeCompare(BaseModel):
+    model:str = Field(default=None, description="模型名称")
+    competitor_template: Optional[AIPromptConfig] = Field(
+        default=None,
+        description="关联的竞品分析模板"
+    )
+    competitor_prompt:Optional[str] = Field(
+        default=None,
+        description="竞品分析最终提示词"
+    )
+    competitor_analyze: Optional[AICompetitorAnalyzeMainKeywordsResult] = Field(
+        default=AICompetitorAnalyzeMainKeywordsResult(),
+        description=""
+    )
+    marketing_template: Optional[AIPromptConfig] = Field(
+        default=None,
+        description="关联的营销模板"
     )
+    marketing: Optional[MarketingInfo] = Field(
+        default=None,
+        description="营销信息,使用JSONB存储。卖点1、卖点2、产品介绍风格1、风格2。。。")
+    created_at:Optional[datetime] = Field(default_factory=datetime.now)
+    
+
 # 产品主表(核心实体)
 class Product(Document):
     basic_info: Optional[ProductBaseInfo] = Field(
@@ -163,9 +182,13 @@ class Product(Document):
         default={},  # 明确设置默认值为 None
         description="竞品分析信息,使用JSONB存储。竞品主关键词分析、竞品长尾词分析。。。")
     competitor_analyze: Optional[AICompetitorAnalyzeMainKeywordsResult] = Field(
-        default=CompetitorAnalyze(),
+        default=AICompetitorAnalyzeMainKeywordsResult(),
         description="" 
     )
+    ai_analysis_compare: Optional[Dict[str, AIAnalyzeCompare]] = Field(
+        default=[],
+        description="不同大模型分析结果对比"
+    )
     # 变体,如版本、型号、颜色、套餐,各个变体对应着价格、成本等财务数据
     variants: Optional[List[Variant]] = Field(
         default=None,
@@ -175,34 +198,3 @@ class Product(Document):
     updated_at: Optional[datetime] = Field(default=None)
 
 
-async def main():
-    # Beanie uses Motor async client under the hood 
-    client = AsyncIOMotorClient("mongodb://sv-v2:27017")
-    # 不需要传参数据库名,直接通过 client.test 属性名的方式就能连接或创建 test 数据库
-    # 或者通过 client["test"] 的方式连接或创建 test 数据库
-    await init_beanie(database=client["amazone"], document_models=[Product])
-    # await insert_object()
-    product = Product(
-        basic_info=ProductBaseInfo(
-            name="电线保护套",
-            size="直径6MM,长度93-95CM",
-            weight="30G",
-        ),
-        marketing=MarketingInfo(
-            
-        ),
-        competitor_crawl_data=[
-            CompetitorCrawlData(
-                
-            ) 
-        ],
-        variants=[
-            Variant(),
-        ],
-    )
-    print(product.model_dump_json(indent=2))
-    await product.insert()
-
-if __name__ == "__main__":
-    asyncio.run(main())
-    

+ 0 - 0
tests/mytest/models/t_filed_qwenpy → tests/mytest/models/t_filed_qwen.py


+ 0 - 0
tests/mytest/t_odm_beanie.py → tests/mytest/models/t_odm_beanie.py


+ 138 - 0
tests/mytest/models/t_odm_link.py

@@ -0,0 +1,138 @@
+import asyncio
+from pathlib import Path
+from typing import List,Dict,Any
+
+from beanie import WriteRules,PydanticObjectId
+from config.settings import CFG
+from src.manager.core.db import DbManager,AsinSeed
+from src.manager.core.db_mongo import BaseMongoManager
+from utils.file import save_to_file, read_file
+from config.celery import app
+# Remove direct task imports
+from celery.result import AsyncResult
+from src.models.product_model import (
+    Product,CompetitorCrawlData,AICompetitorAnalyzeMainKeywords, 
+    SearchAmazoneKeyResult, ProductBaseInfo, Variant,AICompetitorAnalyzeMainKeywordsResult,
+    AIAnalyzeCompare
+    )
+from src.models.field_config import FieldConfig
+from utils.logu import get_logger
+from upath import UPath
+logger = get_logger('test')
+from datetime import datetime
+from typing import Dict, List, Optional, Any
+from pydantic import Field, BaseModel
+from beanie import Document, BackLink, Link
+
+class AIPromptConfig(Document):
+    """AI提示配置模型(嵌入式文档)"""
+    name: str = Field(default=None, description="名称")
+    introduce: Optional[str] = Field(default=None, description="介绍")
+    template: str = Field(default=None, description="llamaindex 提示模板")
+    keywords: Optional[Dict[Any, Any]] = Field(
+        default={},
+        description="llamaindex 模板中的变量"
+    )
+    owner: Optional[BackLink["UserConfig"]] = Field(
+       original_field="ai_prompt_configs",
+ 
+    )
+    class Settings:
+        name = "configs"  # MongoDB集合名称
+
+class UserConfig(Document):
+    user_name: str = Field(default=None, description="")
+    email: Optional[str] = Field(default=None, description="")
+    ai_prompt_configs:Optional[List[Link[AIPromptConfig]]] = Field(
+        description="关联的AI提示配置"
+    )
+    created_at: datetime = Field(default_factory=datetime.now)
+    updated_at: Optional[datetime] = Field(default=None)
+
+    class Settings:
+        name = "configs"  # MongoDB集合名称
+        use_state_management = True  # 启用状态管理
+
+    async def update_timestamp(self):
+        """更新文档时间戳"""
+        self.updated_at = datetime.now()
+        await self.save()
+async def create_user():
+    # 初始化Beanie
+    db_mongo = BaseMongoManager(db_name='test')
+    await db_mongo.initialize()
+    user_config = UserConfig(
+        user_name="test_user",
+        ai_prompt_configs=[
+            AIPromptConfig(
+                name="竞品和长尾词分析",
+                introduce="没有变量",
+                template="""
+我是日本站的亚马逊运营,我在给我的产品选主要关键词和长尾关键词。
+
+请根据以上竞品数据,按以下规则分析:
+- 选出搜索量在1万以上的相同关键词作为主要关键词3个。
+- 如果竞品的搜索量都不足1万,则从排名前十的关键词中筛选3个搜索量最大且相关性最强的词。
+- 结合日本市场特点分析
+- 根据我的产品基本信息,从竞品的主要信息和同类竞品的相似关键词中,筛选出最符合我产品的长尾关键词 tail_keys 12 个(如果存在的话)。
+
+筛选长尾词的示例:
+- 假设我的产品是电线保护,那么竞品关键词中,“隐藏排线管” 就不符合长尾词
+- 假设我的产品是“防老化、防动物咬”用途,你就不能在竞品数据中选择不属于我这个使用场景的长尾关键词。
+""", 
+            )
+        ]
+    )
+    # 谁使用了 Link 谁就要保存,并且传参 link_rule=WriteRules.WRITE
+    await user_config.save(link_rule=WriteRules.WRITE)
+
+async def add_product_prompt():
+    # 初始化Beanie
+    db_mongo = BaseMongoManager(db_name='test')
+    await db_mongo.initialize()
+    # 获取用户
+    user_config = await UserConfig.find_one(UserConfig.user_name == "test_user", fetch_links=True)
+    marketing = AIPromptConfig(
+                name="营销文案",
+                introduce="没有变量",
+                template="""
+我是日本站的亚马逊运营,帮我给我的产品分析出最佳文案。
+
+请根据以上竞品数据,按以下规则分析:
+- 结合日本市场特点分析
+""", 
+            )
+    marketing.owner = user_config
+    await marketing.save(link_rule=WriteRules.WRITE)
+'''
+❗ 使用外链和反向链接反而增加复杂度, MongoDB 一个文档可以很大,并不是非得外链接
+缺点:
+- 操作麻烦,每次都要先获取外链文档,再获取所属者,再按照顺序来保存
+- 提取的时候要加 fetch_links 参数,否则无法直接访问对象
+- 如果文档有反向链接,打印的时候会全部显示出来,无法直观观察数据
+- 在 MongoDB GUI客户端中,无法直观地查看数据,各个文档特别分散
+- 集合不清晰,各个文档散乱
+'''
+async def main():
+    # await add_product_prompt()
+    # return
+    # 初始化Beanie
+    db_mongo = BaseMongoManager(db_name='test')
+    await db_mongo.initialize()
+    # await create_user()
+    # return 
+    # 获取用户
+    user_config = await UserConfig.find_one(UserConfig.user_name == "test_user")
+    await user_config.fetch_link(UserConfig.ai_prompt_configs)
+    # marketing_prompt = await AIPromptConfig.find_one(AIPromptConfig.name == "营销文案", fetch_links=True)
+    # competitor_prompt = await AIPromptConfig.find_one(AIPromptConfig.name == "竞品和长尾词分析", fetch_links=True)
+    # user_config.ai_prompt_configs = [marketing_prompt,competitor_prompt]
+    # await user_config.save(link_rule=WriteRules.WRITE)
+    # logger.info(f"{marketing_prompt.owner}")
+    # logger.info(f"{competitor_prompt.owner}")
+    logger.info(f"{user_config.ai_prompt_configs}")
+    
+
+if __name__ == "__main__":
+    asyncio.run(main())
+    

+ 0 - 0
tests/mytest/t_odm_mongo_engine.py → tests/mytest/models/t_odm_mongo_engine.py


+ 152 - 0
tests/mytest/models/t_odm_promting_config.py

@@ -0,0 +1,152 @@
+from src.manager.core.db_mongo import BaseMongoManager
+from utils.file import save_to_file, read_file
+from config.celery import app
+# Remove direct task imports
+from celery.result import AsyncResult
+from src.models.product_model import (
+    Product,CompetitorCrawlData,AICompetitorAnalyzeMainKeywords, 
+    SearchAmazoneKeyResult, ProductBaseInfo, Variant,AICompetitorAnalyzeMainKeywordsResult,
+    AIAnalyzeCompare
+    )
+from src.models.field_config import FieldConfig
+from src.models.config_model import (UserConfig, AIPromptConfig)
+from utils.logu import get_logger
+from upath import UPath
+logger = get_logger('test')
+from datetime import datetime
+from typing import Dict, List, Optional, Any
+from pydantic import Field, BaseModel
+from beanie import Document, BackLink, Link
+
+
+import asyncio
+import aiofiles
+import os
+import sys
+
+def get_ai_prompt_config():
+    ai_marketting_prompt_config = AIPromptConfig(
+                name="营销文案",
+                introduce="没有变量",
+                template="""
+我是日本站的亚马逊运营,帮我给我的产品分析出最佳文案。
+
+请根据以上数据,按以下规则分析:
+- 结合日本市场特点分析
+""", 
+    )
+    ai_competitor_promting = AIPromptConfig(
+                name="竞品和长尾词分析",
+                introduce="没有变量",
+                template="""
+我是日本站的亚马逊运营,我在给我的产品选主要关键词和长尾关键词。
+
+请根据以上竞品数据,按以下规则分析:
+- 选出搜索量在1万以上的相同关键词作为主要关键词3个。
+- 如果竞品的搜索量都不足1万,则从排名前十的关键词中筛选3个搜索量最大且相关性最强的词。
+- 结合日本市场特点分析
+- 根据我的产品基本信息,从竞品的主要信息和同类竞品的相似关键词中,筛选出最符合我产品的长尾关键词 tail_keys 12 个(如果存在的话)。
+
+筛选长尾词的示例:
+- 假设我的产品是电线保护,那么竞品关键词中,“隐藏排线管” 就不符合长尾词
+- 假设我的产品是“防老化、防动物咬”用途,你就不能在竞品数据中选择不属于我这个使用场景的长尾关键词。
+""", 
+            )
+    return ai_marketting_prompt_config,ai_competitor_promting
+async def create_user():
+    db_mongo = BaseMongoManager()
+    await db_mongo.initialize()
+    user = await UserConfig.find_one(UserConfig.user_name == "test_user")
+    if user:
+        return user
+    user = UserConfig(
+        user_name="test_user",
+    )
+    await user.save()
+
+async def add_promt():
+    db_mongo = BaseMongoManager()
+    await db_mongo.initialize()
+    await create_user()
+    user = await UserConfig.find_one(UserConfig.user_name == "test_user")
+    p = AIPromptConfig(
+        name="竞品和长尾词分析-变量",
+        template="""
+我是日本站的亚马逊运营,我在给产品名称为 {product_name} 选主要关键词和长尾关键词。
+
+请根据以上 {len_competitor_data} 个竞品数据,按以下规则分析:
+- 选出搜索量在1万以上的相同关键词作为主要关键词 {main_key_num} 个。
+- 如果竞品的搜索量都不足1万,则从排名前十的关键词中筛选 {main_key_num} 个搜索量最大且相关性最强的词。
+- 结合日本市场特点分析
+- 根据我的产品基本信息,从竞品的主要信息和同类竞品的相似关键词中,筛选出最符合我产品的长尾关键词 tail_keys {tail_key_num} 个以上
+
+筛选长尾词的示例:
+- 假设我的产品是电线保护,那么竞品关键词中,“隐藏排线管” 就不符合长尾词
+- 假设我的产品是“防老化、防动物咬”用途,你就不能在竞品数据中选择不属于我这个使用场景的长尾关键词。
+"""
+    )
+    p1,p2 = get_ai_prompt_config()
+    await user.add_prompting(p)
+    await user.add_prompting(p1)
+    await user.add_prompting(p2)
+# 主要是用于查找列表元素。但是返回字典,不够友好
+async def find_demo():
+    db_mongo = BaseMongoManager(db_name='test')
+    await db_mongo.initialize()
+    user = await UserConfig.find_one(UserConfig.user_name == "test_user")
+    # 查找所有包含"营销文案"配置的用户
+    pipeline = [
+        {
+            "$match": {
+                "prompting.marketing.name": "营销文案"
+            }
+        },
+        {
+            "$project": {
+                "results": {
+                    "$filter": {
+                        "input": "$prompting.marketing",
+                        "as": "item",
+                        "cond": { "$eq": ["$$item.name", "营销文案"] }
+                    }
+                }
+            }
+        }
+    ]
+    
+    # 使用 aggregate 方法执行管道
+    result = await UserConfig.aggregate(pipeline).to_list()
+    logger.info(f"{result}")
+
+    pipeline = [
+        {
+            "$match": {
+                "prompting.competitor.name": "竞品和长尾词分析"
+            }
+        },
+        {
+            "$project": {
+                "results": {
+                    "$slice": [
+                        {
+                            "$filter": {
+                                "input": "$prompting.competitor",
+                                "as": "item",
+                                "cond": { "$eq": ["$$item.name", "竞品和长尾词分析"] }
+                            }
+                        },
+                        1
+                    ]
+                }
+            }
+        }
+    ]
+    
+    # 使用正确的管道结构执行聚合查询
+    result = await UserConfig.aggregate(pipeline).to_list()
+    logger.info(f"{result}")
+def main():
+    asyncio.run(add_promt())
+
+if __name__ == "__main__":
+    main()