3 Commits faea3ddfea ... 1450c35a05

Author SHA1 Message Date
  mrh 1450c35a05 Merge branch 'master_back' 11 months ago
  mrh 9820ba46f2 修改 product 爬取结果字段,并完成迁移。新增主关键词分析字段。备份数据库 I:\eng\backup\mongo\20250326_02_05_03 11 months ago
  mrh 04c29fb750 新增数据库备份。修复竞品爬取图片下载到S3路径错误的问题。 11 months ago

+ 1 - 0
.gitignore

@@ -11,3 +11,4 @@ wheels/
 output/
 .env
 .aider*
+.history

+ 15 - 2
README.md

@@ -5,9 +5,22 @@ uv pip install universal_pathlib
 文档 : https://www.mongodb.com/docs/database-tools/
 ```shell
 cd G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin
-mongodump --uri="mongodb://sv-v2:27017" -o G:\code\amazone\copywriting_production\output\mongodb_backup
+
+$timestr = Get-Date -Format "yyyyMMdd_HH_mm_ss";G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongodump.exe --uri="mongodb://sv-v2:27017" -o I:\eng\backup\mongo\$timestr
+
 # 首先要加载 env 变量, J:\eng-sftp\nextcloud\eng\my-document\restic.md
 restic backup G:\code\amazone\copywriting_production\output\mongodb_backup
-restic snapshots
+restic snapshots -H pc
 restic ls -l 04c9313f
+```
+
+## 备份集合到另一个数据库
+```shell
+G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongodump.exe --uri="mongodb://sv-v2:27017/amazone" --collection=Product -o G:\code\amazone\copywriting_production\output\temp
+$timestr = Get-Date -Format "yyyyMMdd_HH_mm_ss"
+echo $timestr
+G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongorestore.exe --uri="mongodb://sv-v2:27017/" --nsInclude="amazone.Product" --nsFrom="amazone.Product" --nsTo="backup_amazone.Product123" G:\code\amazone\copywriting_production\output\temp\amazone\Product.bson --drop
+
+G:\program\MongoDB\mongodb-database-tools-windows-x86_64-100.11.0\bin\mongorestore.exe --uri="mongodb://sv-v2:27017/backup_amazone" --collection=Product G:\code\amazone\copywriting_production\output\temp
+mongoimport --uri="mongodb://sv-v2:27017" --collection=users --file=G:\code\amazone\copywriting_production\output\mongodb_backup\users.json
 ```

+ 16 - 0
docs/gpt/数据库架构师.md

@@ -1,3 +1,19 @@
+# AI 分析
+User: 
+我打算要python做一个运营软件。需要用到对象存储、数据库、或者jason数据的动态更新。我涉及的内容有表格,图片。HTML网页、商品编号,关键词,产品信息,产品文案的记录和保存还有长尾关键词,总之有些数据是在网站中获取的。有些数据是通过大模型生成的。还有一些表格是通过计算和整理出来的。还有一些markdown文档是通过计算,还有引入链接的方式生成的、还有一些图片截图的数据。这些数据都不是固定的,某些情况下客户需求改变,我必须要新增字段、图片、表格、markdown文档、docx文档、mhtml 文件等等。并且大模型生成还会用到 embedding 向量数据。
+
+Product 是我已定义好的模型,以后会将 competitor_crawl_data 数据发给AI大模型让它分析最佳的关键词。分析的数据结果存储到 competitor_analyze 中。例如由用户定义提示词:
+```txt
+我是亚马逊运营,我在给产品名称为数据线保护套选主要关键词,附件中是我从同类竞品的关键词搜索量数据。例如 B0B658JC22 是第一个商品的竞品数据,它所在的列是它的关键词,第二列是该竞品搜索关键词月搜索量。末尾包含了该产品的所有信息。
+往右是第二个商B0CQ1SHD8V 也是一样。帮我选出他们两个的相同关键词并且搜索量在1万以上来作为我产品的主要关键词3个
+```
+通常大模型会返回 json 信息,我当前希望返回的是 主关键词和搜索量两个字段。
+
+下一步,我需要将主关键词进行网页搜索,然后将搜索结果保存到 SearchAmazoneKeyResult 中。
+
+但是我觉得 CompetitorAnalyze.ai_analyze_main_keywords 这个层次不确定是否要减少这层嵌套。因为AI分析或许包含别的内容,例如提示词规则?或者关键词筛选规则?或者别的引用。在智能体中,是否还需要保留配置,或者输出结果等等字段。
+
+
 # MongoDB 字段元数据的描述
 User:
 对于上述这样的 MongoDB 定义,我发现 有时候是需要 description 字段存到数据库中的,因为当前只是这个项目对数据写入操作,万一别人读取的时候是从数据库中读取,例如客户端,他们并不知道有关字段的描述。在最佳实践设计来说,你觉得我应该单独定义每一个字段的描述吗?还是有什么别的好方式?

+ 44 - 0
src/ai/agent_product.py

@@ -0,0 +1,44 @@
+from llama_index.core import PromptTemplate
+
+import asyncio
+import aiofiles
+import os
+import sys
+from src.models.product_model import Product,CompetitorCrawlData,AICompetitorAnalyzeMainKeywords, SearchAmazoneKeyResult, ProductBaseInfo, Variant
+from src.manager.core.db_mongo import BaseMongoManager
+from utils.logu import get_logger
+logger = get_logger('ai')
+async def task():
+    db_mongo = BaseMongoManager()
+    await db_mongo.initialize()
+    product = await Product.find_one(Product.basic_info.name == "电线保护套")
+    competitor_crawl_data = product.competitor_crawl_data
+    for asin,crawl_data in competitor_crawl_data.items():
+        logger.info(f"{asin}")
+        logger.info(f"{crawl_data.extra_result.product_info}")
+        logger.info(f"{crawl_data.extra_result.result_table}")
+        break
+    return
+
+async def test_product_mongo():
+    db_mongo = BaseMongoManager()
+    await db_mongo.initialize()
+    product = await Product.find_one(Product.basic_info.name == "电线保护套")
+    for crawl_data in product.competitor_crawl_data.values():
+        logger.info(f"{crawl_data.extra_result}")
+        break
+    analyz_main_keyword_template_str = '''
+    我是亚马逊运营,我在给产品名称为数据线保护套选主要关键词,附件中是我从同类竞品的关键词搜索量数据。
+    例如 B0B658JC22 是第一个商品的竞品数据,它所在的列是它的关键词,第二列是该竞品搜索关键词月搜索量。
+    末尾包含了该产品的所有信息。
+    往右是第二个商B0CQ1SHD8V 也是一样。帮我选出他们两个的相同关键词并且搜索量在1万以上来作为我产品的主要关键词3个。
+    如果竞品的所有关键词搜索量都没有达到1万以上的话,就刷选,就从排名前十的关键词里筛选三个搜索量最大相关性最强的词最为主关键词。
+    '''
+    text_qa_template = PromptTemplate(analyz_main_keyword_template_str)
+
+
+def main():
+    asyncio.run(task())
+
+if __name__ == "__main__":
+    main()

+ 11 - 11
src/browser/crawl_amz_search_key.py

@@ -20,19 +20,19 @@ from utils.drission_page import load_chrome_from_ini,ChromeOptions
 from utils.file import save_to_file,check_exists,s3,read_file,s3_uri_to_http_url
 from config.settings import CFG
 from src.browser.crawl_base import CrawlerBase
-
+from src.models.product_model import Product,CompetitorCrawlData,SearchAmazoneKeyResult
 logger = get_logger('browser')
 AMZ_HTML_DIR = OUTPUT_DIR / 'page' / 'amz'
 AMZ_HTML_DIR.mkdir(parents=True, exist_ok=True)
 
-class SearchKeyResult(BaseModel):
-    suggestions:List[str] = []
-    search_key:Optional[str] = None
-    mhtml_path:Optional[str] = None
-    screenshot:Optional[str] = None
-    error:Optional[int] = None
-    msg:Optional[str] = None
-    created_at:Optional[datetime] = Field(default_factory=datetime.now)
+# class SearchAmazoneKeyResult(BaseModel):
+#     suggestions:List[str] = []
+#     search_key:Optional[str] = None
+#     mhtml_path:Optional[str] = None
+#     screenshot:Optional[str] = None
+#     error:Optional[int] = None
+#     msg:Optional[str] = None
+#     created_at:Optional[datetime] = Field(default_factory=datetime.now)
 
 class CrawlerSearchKeyInput(BaseModel):
     search_key:str
@@ -64,7 +64,7 @@ class CrawlerAmzSearchKey(CrawlerBase):
         input_box.input(search_key)
         suggestion_ele_list = self.tab.s_ele('xpath://input[@id="sac-autocomplete-results-container"]', timeout=3)
 
-    async def crawl_suggestion(self, search_input:CrawlerSearchKeyInput) -> SearchKeyResult:
+    async def crawl_suggestion(self, search_input:CrawlerSearchKeyInput) -> SearchAmazoneKeyResult:
         if not check_exists(search_input.mhtml_path) or search_input.overwrite:
             await asyncio.to_thread(self.search_key_and_save_page, search_input.search_key)
             save_mhtml_path,temp_mhtml_path = self.save_current_page(self.tab, search_input.mhtml_path, after_unlink=False)
@@ -99,7 +99,7 @@ class CrawlerAmzSearchKey(CrawlerBase):
         result = await self.excra_strategy_raw_html(html_str, schema=schema)
         data = json.loads(result.extracted_content)
         logger.debug(f"{result.extracted_content}")
-        search_key_result = SearchKeyResult(search_key=search_input.search_key, mhtml_path=search_input.mhtml_path, screenshot=str(search_input.screenshot_path))
+        search_key_result = SearchAmazoneKeyResult(search_key=search_input.search_key, mhtml_path=search_input.mhtml_path, screenshot=str(search_input.screenshot_path))
         suggestions = []
         if len(data) == 0:
             msg = f"{search_input.search_key} has no suggestions, temp_mhtml_path {temp_mhtml_path}"

+ 2 - 2
src/browser/crawl_asin.py

@@ -177,12 +177,12 @@ class Crawler(CrawlerBase):
         data = await self.excra_product_info(html_content)
         if data['product_info'].get('image_url'):
             img_name = UPath(data['product_info']['image_url']).name
-            upload_s3_dir = str(UPath(upload_s3_dir) / img_name)
+            img_path = str(UPath(upload_s3_dir) / img_name)
             logger.info(f"upload_s3_dir {upload_s3_dir}")
             status,save_img_path = await asyncio.to_thread(self.download_img, 
                 data['product_info']['image_url'],
                 as_img_base64=False,
-                upload_s3_dir=upload_s3_dir)
+                img_path=img_path)
         data['product_info']['img_path'] = save_img_path
         logger.info(f"{json.dumps(data, indent=4,ensure_ascii=False)}")
         return data

+ 4 - 7
src/browser/crawl_base.py

@@ -90,7 +90,7 @@ class CrawlerBase():
                 )
             )
             return result
-    def download_img(self,url:str,save_dir:str=TEMP_PAGE_DIR, page:str=None,as_img_base64:bool=True, upload_s3_dir:str=''):
+    def download_img(self,url:str,save_dir:str=TEMP_PAGE_DIR, page:str=None,as_img_base64:bool=True, img_path:str=''):
         # ('success', '{abs_current_path}\\notice.svg')
         p = page or self.page
         status,path = p.download(url, save_path=save_dir)
@@ -103,14 +103,11 @@ class CrawlerBase():
                 Path(path).unlink()
                 # dataUrl = f"data:image/svg+xml;base64,{encoded_string}"
                 return status,encoded_string
-            if upload_s3_dir:
-                if not upload_s3_dir.endswith('/'):
-                    upload_s3_dir += '/'
-                save_img_path = f"{upload_s3_dir}{path_name}"
+            if img_path:
                 with open(path, 'rb') as f:
-                    save_to_file(f.read(), save_img_path)
+                    save_to_file(f.read(), img_path)
                 Path(path).unlink()
-                return status,save_img_path
+                return status,img_path
         return status,path
     async def run(self, url:str):
         page = load_chrome_from_ini(

+ 14 - 14
src/excel_tools/file_manager.py

@@ -1,3 +1,4 @@
+import asyncio
 import json
 from pathlib import Path
 import shutil
@@ -13,7 +14,8 @@ from src.manager import DbManager,StorageManager
 from utils.logu import get_logger
 from config.settings import OUTPUT_DIR
 from src.models.asin_model import AsinExtraResultModel
-
+from src.models.product_model import Product,CompetitorCrawlData
+from src.manager.core.db_mongo import BaseMongoManager
 
 logger = get_logger('excel')
 
@@ -38,28 +40,26 @@ class ExcelFileManager:
 
     def save_all(self):
         self.wb.save(self.output_path)
+        logger.info(f"{self.output_path}")
         self.wb.close()
     
-    def write_competitive_sheet(self, extract_data:dict, sheet_name: str = "竞品关键词调研", sheet_index: int = 0, overwrite: bool = False):
+    def write_competitive_sheet(self, competitor_crawl_data:Dict[str, CompetitorCrawlData], sheet_name: str = "竞品关键词调研", sheet_index: int = 0, overwrite: bool = False):
         if overwrite and sheet_name in self.wb.sheetnames:
             self.wb.remove(self.wb[sheet_name])
         if sheet_name not in self.wb.sheetnames:
             competitive_sheet_writer = CompetitiveAnalysisWriter(self.wb, sheet_name=sheet_name, sheet_index=sheet_index)
-            competitive_sheet_writer.add_data(extract_data)
+            competitive_sheet_writer.add_data(competitor_crawl_data)
     def load_s3_extract_data(self) -> list[AsinExtraResultModel]:
         return self.s3_storage_manager.load_s3_complete_extract_data()
 
-def main():
+async def main():
     self = ExcelFileManager(r"G:\code\amazone\copywriting_production\output\resource\multi_data.xlsx")
-    extract_data_lsit = self.load_s3_extract_data()
-    logger.info(f"{extract_data_lsit}")
-    dict_list = [model.model_dump() for model in extract_data_lsit]
-    # 使用 json.dumps() 将字典列表转换为 JSON 字符串
-    json_str = json.dumps(dict_list, indent=4, ensure_ascii=False)
-    save_to_file(json_str, OUTPUT_DIR / "multi_data.json")
-    logger.info(f"{len(extract_data_lsit)}")
-    # self.write_competie_sheet(extract_data)
-    # self.save_all()
+    db_mongo = BaseMongoManager()
+    await db_mongo.initialize()
+    product = await Product.find_one(Product.basic_info.name == "电线保护套")
+    extract_data_lsit = product.competitor_crawl_data
+    self.write_competitive_sheet(product.competitor_crawl_data)
+    self.save_all()
     return
     competi_sheet = CompetitiveAnalysisWriter(excel_file.output_path)
 
@@ -86,4 +86,4 @@ def main():
 
 
 if __name__ == "__main__":
-    main()
+    asyncio.run(main())

+ 27 - 12
src/excel_tools/writers/competitive_analysis.py

@@ -13,6 +13,8 @@ from utils.file import read_file
 from utils.logu import get_logger
 from openpyxl import load_workbook,Workbook
 from .base_writer import ExcelWriterBase
+from src.models.product_model import Product,CompetitorCrawlData
+
 
 logger = get_logger('excel')
 
@@ -82,24 +84,37 @@ class CompetitiveAnalysisWriter(ExcelWriterBase):
             self.ws = self.wb.create_sheet(self.sheet_name, index=self.sheet_index)
             logger.info(f"新建工作表: {self.sheet_name}")
 
-    def add_data(self, data: List[Dict[str, Any]]):
-        for product_data in data:
-            logger.info(f"{product_data['asin']}, 处理中...")
-            self.add_product(product_data['extra_result_data'], product_data['asin'])
+    def add_data(self, competitor_crawl_data: Dict[str, CompetitorCrawlData]):
+            
+        for asin, analysis in competitor_crawl_data.items():
+            logger.info(f"正在处理竞品 {asin}...")
+            self.add_product(analysis)
         
         self.apply_formatting()
 
-    def add_product(self, data: dict, asin: str):
-        """添加产品数据"""
+    def add_product(self, analysis: CompetitorCrawlData) -> None:
+        """添加竞品分析数据
+        Args:
+            analysis (CompetitorCrawlData): 竞品分析数据对象,必须包含extra_result和asin字段
+        """
         try:
-            # 加载并处理数据
-            processor = ProductDataProcessor(data, asin)
+            # 参数校验
+            if not analysis.extra_result:
+                raise ValueError(f"竞品分析数据缺失: {analysis.asin}")
+            if not analysis.asin:
+                raise ValueError("竞品ASIN不能为空")
+                
+            # 初始化数据处理
+            processor = ProductDataProcessor(
+                json_data=analysis.extra_result,
+                asin=analysis.asin
+            )
             
             # 记录产品起始列
             self.product_cols.append(self.current_col)
             
-            # 写入主数据表
-            self._write_main_table(processor, asin)
+            # 写入主数据表(使用竞品分析中的ASIN)
+            self._write_main_table(processor, analysis.asin)
             
             # 写入附加信息
             self._write_additional_info(processor)
@@ -110,8 +125,8 @@ class CompetitiveAnalysisWriter(ExcelWriterBase):
             # 移动到下一组列
             self.current_col += self.COLUMN_SPACING
             
-        except (json.JSONDecodeError, ValueError) as e:
-            logger.error(f'Error processing {data}: {e}')
+        except (ValueError, KeyError) as e:
+            logger.error(f'处理竞品 {analysis.asin} 数据失败: {e}')
 
     
     def _write_main_table(self, processor: ProductDataProcessor, asin: str):

+ 15 - 4
src/manager/core/db_mongo.py

@@ -1,3 +1,4 @@
+from datetime import datetime
 from typing import Optional, List
 from beanie import Document, Indexed, init_beanie
 from beanie.odm.operators.update.general import Set
@@ -5,7 +6,7 @@ from pydantic import BaseModel
 from motor.motor_asyncio import AsyncIOMotorClient
 from config.settings import MONGO_URL, MONGO_DB_NAME
 from src.models.product_model import Product
-
+from beanie.operators import Set, Rename
 
 class BaseMongoManager:
     def __init__(self, mongo_url: str = None, db_name: str = None):
@@ -14,6 +15,11 @@ class BaseMongoManager:
     async def initialize(self):
         await init_beanie(database=self.db, document_models=[Product])
 
+    async def backup(self, source_collection_name: Document, backup_collection_name:str, backup_db_name: str = f"{MONGO_DB_NAME}_backup"):
+        backup_db = self.client[backup_db_name]
+        backup_collection = backup_db[backup_collection_name]
+        await backup_collection.insert_many(await source_collection_name.find().to_list())
+
 class DbManagerMongo(BaseMongoManager):
     pass
 
@@ -26,13 +32,18 @@ class ProductManagerMongo(BaseMongoManager):
 
     async def get_product_by_name(self, name: str) -> Optional[Product]:
         return await Product.find_one(Product.basic_info.name == name)
-
+    async def migrate_field():
+        # 迁移字段名 competitor_analysis 到 competitor_crawl_data
+        await Product.find(
+        {"competitor_analysis": {"$exists": True}}
+    ).update(
+        Rename({"competitor_analysis": "competitor_crawl_data"})  # 用字典参数
+    )
 async def main():
     # 初始化示例
     db_manager = DbManagerMongo()
     await db_manager.initialize()
-    product = await Product.find_one(Product.basic_info.name == "电线保护套")
-    print(product)
+    await ProductManagerMongo.migrate_field()
     # 使用示例
     # 这里可以添加新的Product操作示例
 

+ 46 - 14
src/manager/manager_task.py

@@ -1,4 +1,5 @@
 import asyncio
+from datetime import datetime
 import json
 from pathlib import Path
 from config.settings import CFG
@@ -8,7 +9,7 @@ from utils.file import save_to_file, read_file
 from config.celery import app
 # Remove direct task imports
 from celery.result import AsyncResult
-from src.models.product_model import Product,CompetitorAnalysis
+from src.models.product_model import Product,CompetitorCrawlData,AICompetitorAnalyzeMainKeywords, SearchAmazoneKeyResult, ProductBaseInfo, Variant
 from utils.logu import get_logger
 from upath import UPath
 logger = get_logger('main')
@@ -85,13 +86,44 @@ class ManagerTask:
             else:
                 self.db.add_or_ignore_asin_seed(AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=task_result['path']))
             return asin_seed
-    def submit_suggestion_task_and_wait(self, asin: str, asin_area: str = 'JP', timeout: int = 300):
-        """提交页面解析任务并等待完成,保存结果到数据库"""
-        # 从数据库获取mhtml路径
-        asin_seed = self.db.get_asin_seed(asin)
-        if asin_seed and asin_seed.extra_result_path:
-            logger.info(f"{asin}已经解析过,跳过")
-
+    def submit_suggestion_task_and_wait(self, product: Product, search_key: str, timeout: int = 300):
+        """提交关键词建议任务并等待完成,保存结果到数据库"""
+        # 检查是否已有缓存结果
+        existing = next((item for item in product.competitor_analyze if item.search_key == search_key), None)
+        if existing and existing.crawl_result:
+            logger.info(f"搜索建议已存在:{search_key},跳过")
+            return existing
+        
+        logger.info(f"开始提交搜索建议任务:{search_key}")
+        
+        # 提交Celery任务
+        task = app.send_task('tasks.crawl_asin_exract_task.process_search_suggestion', args=[search_key])
+        
+        # 等待任务完成
+        result = AsyncResult(task.id)
+        result.get(timeout=timeout)
+        
+        # 处理任务结果
+        if result.successful():
+            task_result = result.result
+            if task_result['status'] == 'success':
+                # 创建新的分析记录
+                new_analyze = AICompetitorAnalyzeMainKeywords(
+                    search_key=search_key,
+                    crawl_result=SearchAmazoneKeyResult(**task_result['data']),
+                    created_at=datetime.now()
+                )
+                
+                # 添加到产品分析列表
+                product.competitor_analyze.append(new_analyze)
+                self.db_mongo.save(product)
+                
+                logger.info(f"搜索建议处理成功:{search_key}")
+                return new_analyze
+        
+        logger.error(f"搜索建议处理失败:{search_key}")
+        return None
+        
     async def extract_competitor_analysis(self, product: Product, timeout: int = 300):
         """异步提交竞争对手分析任务并等待结果
         Args:
@@ -103,8 +135,8 @@ class ManagerTask:
         tasks = []
         
         # 收集需要处理的任务
-        for asin, competitor in product.competitor_analysis.items():
-            competitor: CompetitorAnalysis
+        for asin, competitor in product.competitor_crawl_data.items():
+            competitor: CompetitorCrawlData
             if competitor.extra_result:
                 logger.info(f"{asin}已缓存结果,跳过处理")
                 continue
@@ -145,7 +177,7 @@ async def test_product_mongo():
     asin_completed = manager.db.get_asin_completed()
     logger.info(f"{asin_completed}")
     for asin_model in asin_completed:
-        if not product.competitor_analysis or asin_model.asin not in product.competitor_analysis:
+        if not product.competitor_crawl_data or asin_model.asin not in product.competitor_crawl_data:
             logger.info(f"{asin_model.mhtml_path}")
             mthml_data = read_file(asin_model.mhtml_path)
             mhtml_path_name = Path(asin_model.mhtml_path).name
@@ -153,14 +185,14 @@ async def test_product_mongo():
             res = save_to_file(mthml_data, new_path)
             logger.info(f"new path {res}")
             continue
-            compet = CompetitorAnalysis(
+            compet = CompetitorCrawlData(
                 sql_id=asin_model.id,
                 asin=asin_model.asin, 
                 asin_area=asin_model.asin_area, 
                 mhtml_path=asin_model.mhtml_path,
                 extra_result_path=asin_model.extra_result_path,
                 created_at=asin_model.created_at,)
-            product.competitor_analysis[asin_model.asin] = compet
+            product.competitor_crawl_data[asin_model.asin] = compet
         await product.save()
     return product
 async def main():
@@ -170,7 +202,7 @@ async def main():
     product = await Product.find_one(Product.basic_info.name == "电线保护套")
     await manager.extract_competitor_analysis(product)
     return
-    for asin in product.competitor_analysis.keys():
+    for asin in product.competitor_crawl_data.keys():
         logger.info(f"{asin}")
         manager.submit_asinseed_task_and_wait(asin)
         manager.submit_extract_task_and_wait(asin)

+ 55 - 6
src/models/product_model.py

@@ -50,18 +50,64 @@ class Variant(BaseModel):
     cost_rmb: Optional[float] = None
     logistics_cost: Optional[float] = None
 
-class CompetitorAnalysis(BaseModel):
+class TrafficKeywordResult(BaseModel):
+    """流量关键词结果"""
+    traffic_keyword: Optional[str] = None
+    keyword_link: Optional[str] = None
+    monthly_searches: Optional[str] = None
+    amazon_search_link: Optional[str] = None
+
+class ProductImageInfo(BaseModel):
+    """产品图片信息"""
+    image_url: Optional[str] = None
+    goto_amazon: Optional[str] = None
+    main_text: Optional[str] = None
+    img_path: Optional[str] = None
+
+class ExtraResultModel(BaseModel):
+    """额外结果数据模型"""
+    result_table: Optional[List[TrafficKeywordResult]] = None
+    product_info: Optional[ProductImageInfo] = None
+    unique_words: Optional[List[str]] = None
+
+class CompetitorCrawlData(BaseModel):
     '''竞品表'''
     sql_id: Optional[int] = Field(default=None)
     asin: Optional[str] = None
     asin_area: Optional[str] = 'JP'
     # 爬取数据的 S3 路径
     extra_result_path: Optional[str] = None
-    extra_result: Optional[dict] = None
+    extra_result: Optional[ExtraResultModel] = None
     mhtml_path: Optional[str] = None
     error: Optional[str] = None
     created_at: Optional[datetime] = Field(default_factory=datetime.now)
 
+class SearchAmazoneKeyResult(BaseModel):
+    search_key:Optional[str] = None
+    suggestions:List[str] = []
+    mhtml_path:Optional[str] = None
+    screenshot:Optional[str] = None
+    error:Optional[int] = None
+    msg:Optional[str] = None
+    created_at:Optional[datetime] = Field(default_factory=datetime.now)
+
+class AICompetitorAnalyzeMainKeywords(BaseModel):
+    search_key:str
+    search_num: Optional[int] = 0
+    crawl_result: Optional[SearchAmazoneKeyResult] = Field(
+        default=None,
+        description="爬取AI分析出来的主关键词" 
+    )
+    tail_keys: Optional[List[str]] = Field(
+        default=[],
+        description="AI分析出来的长尾关键词" 
+    )
+
+class CompetitorAnalyze(BaseModel):
+    ai_analyze_main_keywords: Optional[List[AICompetitorAnalyzeMainKeywords]] = Field(
+        default=[],
+        description="AI分析的主关键词" 
+    )
 # 产品主表(核心实体)
 class Product(Document):
     basic_info: Optional[ProductBaseInfo] = Field(
@@ -72,10 +118,13 @@ class Product(Document):
         default=None, 
         description="营销信息,使用JSONB存储。卖点1、卖点2、产品介绍风格1、风格2。。。")
     
-    competitor_analysis: Optional[Dict[str, CompetitorAnalysis]] = Field(
+    competitor_crawl_data: Optional[Dict[str, CompetitorCrawlData]] = Field(
         default={},  # 明确设置默认值为 None
         description="竞品分析信息,使用JSONB存储。竞品主关键词分析、竞品长尾词分析。。。")
-    
+    competitor_analyze: Optional[List[AICompetitorAnalyzeMainKeywords]] = Field(
+        default=CompetitorAnalyze(),
+        description="竞品分析信息,使用JSONB存储。竞品主关键词分析、竞品长尾词分析。。。" 
+    )
     # 变体,如版本、型号、颜色、套餐,各个变体对应着价格、成本等财务数据
     variants: Optional[List[Variant]] = Field(
         default=None,
@@ -101,8 +150,8 @@ async def main():
         marketing=MarketingInfo(
             
         ),
-        competitor_analysis=[
-            CompetitorAnalysis(
+        competitor_crawl_data=[
+            CompetitorCrawlData(
                 
             ) 
         ],

+ 34 - 0
src/tasks/crawl_asin_exract_task.py

@@ -1,5 +1,6 @@
 # tasks/extract_tasks.py
 import asyncio
+from datetime import datetime
 import json
 import os
 from pathlib import Path
@@ -7,6 +8,12 @@ import tempfile
 from upath import UPath
 from config.celery import app
 from src.browser.crawl_asin import Crawler
+from src.browser.crawl_amz_search_key import CrawlerAmzSearchKey, CrawlerSearchKeyInput
+from src.browser.crawl_amz_search_key import CrawlerAmzSearchKey, CrawlerSearchKeyInput
+from src.browser.crawl_amz_search_key import CrawlerAmzSearchKey, CrawlerSearchKeyInput
+from src.browser.crawl_amz_search_key import CrawlerAmzSearchKey, CrawlerSearchKeyInput
+from utils.drission_page import ChromeOptions
+from config.settings import CFG
 from config.settings import CFG,TEMP_PAGE_DIR
 from utils.drission_page import ChromeOptions
 from utils.file import read_file,save_to_file
@@ -44,6 +51,33 @@ async def async_process_mhtml(mhtml_path: str):
     finally:
         await asyncio.to_thread(os.unlink, temp_mhtml_path)
 
+async def async_process_search_suggestion(search_key: str):
+    """异步处理关键词建议"""
+    try:
+        # 完全按照crawl_amz_search_key.py的main函数实现
+        crawler = CrawlerAmzSearchKey(ChromeOptions(ini_path=CFG.chrome_config_ini))
+        search_input = CrawlerSearchKeyInput(
+            search_key=search_key,
+            mhtml_path=f"{CFG.s3_prefix}/output/amz/{search_key}.mhtml",
+            screenshot_path=f"{CFG.s3_prefix}/output/amz/{search_key}.png"
+        )
+        result = await crawler.crawl_suggestion(search_input)
+        return {'status': 'success', 'data': result.model_dump()}
+        
+    except Exception as e:
+        logger.error(f"搜索建议处理失败 | 关键词: {search_key} | 错误: {str(e)}")
+        return {'status': 'error', 'message': str(e)}
+
+@app.task(bind=True, name='tasks.crawl_asin_exract_task.process_search_suggestion', max_retries=0)
+def process_search_suggestion(self, search_key: str):
+    """处理亚马逊搜索建议的Celery任务"""
+    try:
+        loop = asyncio.get_event_loop()
+        return loop.run_until_complete(async_process_search_suggestion(search_key))
+    except Exception as e:
+        logger.exception(f"任务失败: {str(e)}")
+        raise
+
 @app.task(bind=True, name='tasks.crawl_asin_exract_task.extra_result', max_retries=0)
 def extra_result(self, mhtml_path: str):
     """异步Celery任务"""

+ 23 - 0
src/tools/backup_mongo.py

@@ -0,0 +1,23 @@
+import asyncio
+from datetime import datetime
+from typing import Optional, List
+from beanie import Document, Indexed, init_beanie
+from beanie.odm.operators.update.general import Set
+from pydantic import BaseModel
+from motor.motor_asyncio import AsyncIOMotorClient
+from config.settings import MONGO_URL, MONGO_DB_NAME
+from src.models.product_model import Product
+from src.manager.core.db_mongo import BaseMongoManager
+from utils.logu import get_logger
+from upath import UPath
+logger = get_logger('tools')
+
+async def test_product_mongo():
+    db_mongo = BaseMongoManager()
+    await db_mongo.initialize()
+    product = await Product.find_one(Product.basic_info.name == "电线保护套")
+async def main():
+   await test_product_mongo()
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 2 - 2
tests/mytest/t_move_s3_file.py

@@ -7,7 +7,7 @@ from utils.file import save_to_file, read_file
 from config.celery import app
 # Remove direct task imports
 from celery.result import AsyncResult
-from src.models.product_model import Product,CompetitorAnalysis
+from src.models.product_model import Product,CompetitorCrawlData
 from utils.logu import get_logger
 from upath import UPath
 from src.manager.manager_task import ManagerTask
@@ -20,7 +20,7 @@ async def test_product_mongo():
     asin_completed = manager.db.get_asin_completed()
     for asin_model in asin_completed:
         key = asin_model.asin
-        competitor = product.competitor_analysis.get(key)
+        competitor = product.competitor_crawl_data.get(key)
         logger.info(f"{asin_model.mhtml_path}")
         mthml_data = read_file(asin_model.mhtml_path)
         mhtml_file_name = UPath(asin_model.mhtml_path).name