Browse Source

AI竞品分析新增 dry_run 模式

mrh 8 tháng trước cách đây
mục cha
commit
3f18a4be2a
3 tập tin đã thay đổi với 40 bổ sung68 xóa
  1. 10 0
      docs/gpt/agent_product.md
  2. 18 60
      src/ai/agent_product.py
  3. 12 8
      src/manager/manager_task.py

+ 10 - 0
docs/gpt/agent_product.md

@@ -1,3 +1,13 @@
+# 关键词生成
+官方定义推荐关键词:
+保持简洁:虽然 200 个字符是允许的最大字符数,但我们建议您使用 80 个或更少字符,因为手机屏幕会缩短较长的商品名称避免冗余:请勿在商品名称中包含冗余信息、不必要的同义词或过多的关键词优化词序:仅包含有助于买家快速识别和了解商品的信息,将词语排序以优先展示最重要的商品信息。如果适用,您可以考虑以下顺序:品牌名称-口味/款式-商品类型名称-关键属性(即商品的唯一销售主张)-颜色-尺寸/包装数量-型号。
+用户可自行附加额外的内容:我一般会加用途,卖点。其实“商品的唯一销售主张”就是卖点。
+----
+例如,在某个竞品的产品信息中(product_info.main_text)“MEL Chemistry大径 肉厚 ペットコード ペット 犬 猫 キャット ドッグ 噛みつき 防止 感電 保護 家電 チャージ コード 配線 プロテクター カバー 螺旋 スパイラル チューブ ラップ 被覆 破れ 防止 破損防止 補強 収納 収束 結束 まとめる TPU 約93cm (ブラック 黒)B0B658JC22”
+- 属性词是: 大径 肉厚 噛みつき 防止 破損防止 螺旋
+
+
+
 # agent_product 存入 mongodb
 @/src\ai\agent_product.py 
 @/src\models\product_model.py 

+ 18 - 60
src/ai/agent_product.py

@@ -150,63 +150,6 @@ class FormatterFactory:
             return HumanFormatter(notes)
         raise ValueError(f"Unsupported format type: {format_type}")
 
-async def test_product_mongo(main_key_num=3, format_type: str = "json"):
-    db_mongo = BaseMongoManager()  # 自动初始化
-    product = await Product.find_one(Product.basic_info.name == "电线保护套")
-    product_name = product.basic_info.name
-    # 使用默认配置
-    competitor_data = get_competitor_prompt_data(product)
-    competitor_desc = get_field_descriptions(CompetitorCrawlData)
-    product_info_desc = get_field_descriptions(ProductImageInfo)
-    keyword_result_desc = get_field_descriptions(TrafficKeywordResult)
-    
-    # 定义输出字段描述
-    output_fields = {
-        "results":{
-            "asin": "商品(竞品)编号",
-            "main_key": "主要关键词",
-            "monthly_searches": "月搜索量",
-            "reason": "分析你选出的关键词理由"
-        },
-        "supplement": "非必填,仅当你觉得有必要时才提供补充说明。"
-    }
-    # 格式特定的额外说明信息
-    notes = {
-        # "json": "json 信息时必须",
-        # "human": "人类可读信息时必须"
-    }
-    
-    # 生成输出格式
-    output_format = format_output(output_fields, format_type, notes)
-    
-    logger.info(f"competitor_desc {competitor_desc}")
-    logger.info(f"product_info_desc {product_info_desc}")
-    logger.info(f"keyword_result_desc {keyword_result_desc}")
-    
-    analyz_main_keyword_template_str = '''
-各个字段说明:
-{desc}
-
-竞品数据:
-{competitor_data}
-----
-我是日本站的亚马逊运营,我在给产品名称为 {product_name} 选主要关键词,以上数据是我从同类竞品的关键词搜索量数据,总共有 {competitor_count} 个竞品数据。
-请帮我分析这些竞品数据,选出搜索量在1万以上的相同关键词作为主要关键词3个。
-如果竞品的搜索量都不足1万,则从排名前十的关键词中筛选 {main_key_num} 个搜索量最大且相关性最强的词。
-建议结合日本的市场和用户习惯来分析关键词。
-请输出以下格式:
-{output_format}
-'''
-    text_qa_template = PromptTemplate(analyz_main_keyword_template_str)
-    formatted_output = text_qa_template.format(
-        desc=(competitor_desc, product_info_desc, keyword_result_desc),
-        product_name=product_name,
-        competitor_data=competitor_data,
-        competitor_count=len(competitor_data),
-        output_format=output_format,
-    )
-    logger.info(formatted_output)
-    return formatted_output
 
 class LLMService:
     """LLM服务抽象类"""
@@ -267,11 +210,26 @@ class AnalysisService:
         self.llm_service = llm_service
         self.db_manager = db_manager
 
-    async def execute_analysis(self, product:Product, format_type: str = "json") -> Union[dict, str]:
+    async def execute_analysis(self, product:Product, format_type: str = "json", dry_run=False) -> tuple[dict, str]:
         prompt = await self._prepare_prompt(product, format_type)
-        return await self.llm_service.analyze(prompt)
+        logger.info(f"prompt: {prompt}")
+        
+        if dry_run:
+            mock_result = {
+                "results": [{
+                    "asin": "MOCK_ASIN",
+                    "main_key": "MOCK_KEYWORD",
+                    "monthly_searches": "1,000",
+                    "reason": "模拟分析结果"
+                }],
+                "supplement": "模拟补充信息"
+            }
+            return mock_result, prompt
+            
+        analysis_result = await self.llm_service.analyze(prompt)
+        return analysis_result, prompt
 
-    async def _prepare_prompt(self, product: Product, format_type: str, main_key_num=3) -> str:
+    async def _prepare_prompt(self, product: Product, format_type: str = "json", main_key_num: int = 3) -> str:
         competitor_data = get_competitor_prompt_data(product)
         output_fields = {
             "results": {

+ 12 - 8
src/manager/manager_task.py

@@ -178,10 +178,10 @@ class ManagerTask:
         await product.save()
         
 
-    async def async_analyze_and_save(self, product: Product):
-        if product.competitor_analyze:
+    async def async_analyze_and_save(self, product: Product, over_write:bool=False, dry_run: bool = False):
+        if not over_write and product.competitor_analyze:
             logger.info(f"产品 {product.basic_info.name} 已分析过,跳过")
-            return 
+            return None
         """异步执行AI分析并保存结果到数据库"""
         try:
             from src.ai.agent_product import AnalysisService, LiteLLMService
@@ -192,9 +192,10 @@ class ManagerTask:
             llm_service = LiteLLMService(format_type='json')
             analysis_service = AnalysisService(llm_service, self.db_mongo)
             
-            # 执行分析
-            analyze_result = await analysis_service.execute_analysis(product)
             
+            # 执行实际分析
+            analyze_result, prompt = await analysis_service.execute_analysis(product)
+            logger.info(f"提示词:{prompt}")
             logger.info(f"分析结果: {analyze_result}")
             # 转换结果格式
             results = [
@@ -211,7 +212,9 @@ class ManagerTask:
                 results=results,
                 supplement=analyze_result.get("supplement")
             )
-            
+            if dry_run:
+                logger.info(f"{product.competitor_analyze.model_dump_json(indent=2, ensure_ascii=False)}")
+                return 
             # 保存到数据库
             await product.save()
             logger.info("竞品分析结果已成功保存到MongoDB")
@@ -219,6 +222,7 @@ class ManagerTask:
         except Exception as e:
             logger.error(f"保存分析结果失败: {str(e)}")
             logger.exception("完整错误堆栈:")
+            return None
 async def test_product_mongo():
     manager = ManagerTask()    
     await manager.db_mongo.initialize()
@@ -251,8 +255,8 @@ async def main():
     await manager.db_mongo.initialize()
     product = await Product.find_one(Product.basic_info.name == "电线保护套")
     # await manager.extract_competitor_analysis(product)
-    # await manager.async_analyze_and_save(product)
-    await manager.submit_search_mainkeyword(product)
+    await manager.async_analyze_and_save(product, dry_run=True, over_write=True)
+    # await manager.submit_search_mainkeyword(product)
     return
     for asin in product.competitor_crawl_data.keys():
         logger.info(f"{asin}")