Browse Source

引入 MongoDB 到 Product 产品表中

mrh 1 year ago
parent
commit
6e0e0025c5

+ 26 - 1
src/manager/core/db.py

@@ -4,6 +4,7 @@ from sqlmodel import SQLModel, create_engine, Session, select, Field
 from config.settings import DB_URL
 from utils.sql_engine import create_db_and_tables,drop_table,engine
 from src.models.asin_model import AsinSeed
+from src.models.product_model import Product, ProductKeyword, Category, ProductVariant, ProductFinancial, ProductCompetitorLink
 
 class DbManager:
     def __init__(self, engine: str=None):
@@ -42,7 +43,31 @@ class DbManager:
                 return [model.model_dump() for model in list_model]
             else:
                 return list_model
-                
+
+class ProductManager:
+    def __init__(self, engine: str=None):
+        self.engine = engine or create_engine(DB_URL)
+        create_db_and_tables()
+    
+    def add_product(self, product_model: Product):
+        with Session(self.engine) as session:
+            session.add(product_model)
+            session.commit()
+            session.refresh(product_model)
+            return product_model
+
+    def get_product(self, product_id: int) -> Product:
+        with Session(self.engine) as session:
+            statement = select(Product).where(Product.id == product_id)
+            results = session.exec(statement)
+            return results.first()
+    
+    def get_product_by_name(self, product_name: str) -> Product:
+        with Session(self.engine) as session:
+            statement = select(Product).where(Product.title == product_name)
+            results = session.exec(statement)
+            return results.first()
+    
 def main():
     asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
     db_manager = DbManager()

+ 9 - 11
src/manager/manager_task.py

@@ -2,7 +2,7 @@ from pathlib import Path
 from config.settings import CFG
 from src.manager.core.db import DbManager,AsinSeed
 from utils.file import save_to_file, read_file
-from src.tasks.crawl_asin_save_task import get_asin_and_save_page
+from src.tasks.crawl_asin_save_task import get_asin_and_save_page,get_amz_search_key_suggestion
 from src.tasks.crawl_asin_exract_task import extra_result
 from celery.result import AsyncResult
 from utils.logu import get_logger
@@ -13,7 +13,7 @@ class ManagerTask:
     def __init__(self):
         self.db = DbManager()
 
-    def submit_task_and_wait(self, asin: str, asin_area: str = 'JP',overwrite:bool=False, timeout: int = 300):
+    def submit_asinseed_task_and_wait(self, asin: str, asin_area: str = 'JP',overwrite:bool=False, timeout: int = 300):
         model = self.db.get_asin_seed(asin)
         if model and model.mhtml_path:
             logger.info(f"{asin}已经爬取过,跳过")
@@ -78,19 +78,17 @@ class ManagerTask:
             else:
                 self.db.add_or_ignore_asin_seed(AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=task_result['path']))
             return asin_seed
-    def upload_file(self, file_path: str, filename: str):
-        res = save_to_file(Path(file_path).read_text(), self.s3_prefix + '/' + filename)
-        return res
-    def upload_mhtml(self, file_path: str, s3_filename: str=None):
-        if not s3_filename:
-            s3_filename = Path(file_path).stem + '.mhtml'
-        res = self.upload_file(file_path, s3_filename)
-
+    def submit_suggestion_task_and_wait(self, asin: str, asin_area: str = 'JP', timeout: int = 300):
+        """提交页面解析任务并等待完成,保存结果到数据库"""
+        # 从数据库获取mhtml路径
+        asin_seed = self.db.get_asin_seed(asin)
+        if asin_seed and asin_seed.extra_result_path:
+            logger.info(f"{asin}已经解析过,跳过")
 def main():
     asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
     manager = ManagerTask()    
     for asin in asinseed_list:
-        manager.submit_task_and_wait(asin)
+        manager.submit_asinseed_task_and_wait(asin)
         manager.submit_extract_task_and_wait(asin)
     # result = {'status': 'success', 'path': 's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml'}
     # manager.save_task_asin_crawl_result('B0B658JC22', 'JP', result)

+ 90 - 132
src/models/product_model.py

@@ -1,159 +1,117 @@
+import asyncio
 from datetime import datetime
 from typing import Optional, Dict, List
 from sqlmodel import SQLModel, Field, Relationship, Column, JSON
 from sqlalchemy.dialects.postgresql import JSONB
 from sqlalchemy import ForeignKeyConstraint
 from pydantic import BaseModel
+from beanie import Document, Indexed, init_beanie
+from motor.motor_asyncio import AsyncIOMotorClient,AsyncIOMotorDatabase
 
 class MarketingInfo(BaseModel):
     """营销信息"""
+    title: Optional[str] = None
+    st_search: Optional[str] = None
     selling_point: Optional[List[str]] = None
-    product_style: Optional[str] = None
-    referent: Optional[dict] = {}
+    product_introduction: Optional[str] = None
 
-# 类目表(邻接表模式)
-class Category(SQLModel, table=True):
-    id: Optional[int] = Field(default=None, primary_key=True)
-    name: str = Field(index=True, nullable=False)
-    parent_id: Optional[int] = Field(default=None, foreign_key="category.id")
-    level: int = Field(default=1, description="类目层级(1级为最高)")
-    full_path: str = Field(
-        index=True,
-        description="完整类目路径(如:'家居/厨房用品/餐具')"
-    )
+class ProductBaseInfo(BaseModel):
+    """产品基本信息
+产品名称	电线保护套			
+包装内容	一个/袋			
+材质	TPU			
+颜色	三种颜色:黑色、银色、金色			
+尺寸	直径6MM,长度93-95CM			
+包裹尺寸	14*17*3CM			
+重量	30G			
+主要用途	保护电源线,车匝线,刹车线			
+主要卖点	
+- 优质TPU,健康环保好看,耐磨耐脏			
+- 耐高温防火阻燃,			
+- 预防线管老化,延长线路使用寿命			
+- 柔软易弯曲,可卷起来收纳			
+    """
+    name: Optional[str] = None
+    content: Optional[str] = None
+    material: Optional[str] = None
+    color: Optional[str] = None
+    size: Optional[str] = None
+    packaging_size: Optional[str] = None
+    weight: Optional[str] = None
+    main_usage: Optional[str] = None
+    selling_point: Optional[List[str]] = None
+
+# 价格成本专用表(与变体一对一关系)
+class Variant(BaseModel):
+    name: Optional[str] = None
+    # 核心财务字段(结构化存储)
+    base_price: Optional[float] = None
+    commission_rate: Optional[float] = None
+    fba_fee: Optional[float] = None
+    cost_rmb: Optional[float] = None
+    logistics_cost: Optional[float] = None
+
+class CompetitorAnalysis(BaseModel):
+    '''竞品表'''
+    sql_id: Optional[int] = Field(default=None)
+    asin: Optional[str] = None
+    asin_area: Optional[str] = 'JP'
+    # 爬取数据的 S3 路径
+    extra_result_path: Optional[str] = None
+    mhtml_path: Optional[str] = None
+    error: Optional[str] = None
+    created_at: Optional[datetime] = Field(default_factory=datetime.now)
 
 # 产品主表(核心实体)
-class Product(SQLModel, table=True):
-    id: Optional[int] = Field(default=None, primary_key=True)
-    title: str = Field(index=True, unique=True, nullable=False)
-    st_search: Optional[int] = Field(default=None)
-    # 卖点1、卖点2、产品介绍风格1、风格2、
+class Product(Document):
+    basic_info: Optional[ProductBaseInfo] = Field(
+        default=None,
+        description="产品基本信息"
+    )
     marketing: Optional[MarketingInfo] = Field(
-        default={}, 
-        sa_column=Column(JSONB),
+        default=None, 
         description="营销信息,使用JSONB存储。卖点1、卖点2、产品介绍风格1、风格2。。。")
     
-    # 动态扩展字段(竞品主关键词分析、竞品长尾词分析)
-    competitor_analysis: Optional[dict] = Field(
-        default={},
-        sa_column=Column(JSONB),
+    competitor_analysis: Optional[List[CompetitorAnalysis]] = Field(
         description="竞品分析信息,使用JSONB存储。竞品主关键词分析、竞品长尾词分析。。。")
     
-    leaf_category_id: Optional[int] = Field(
-        foreign_key="category.id",
-        description="产品所属的末级类目ID"
-    )
-
     # 变体,如版本、型号、颜色、套餐,各个变体对应着价格、成本等财务数据
-    variants: List["ProductVariant"] = Relationship(back_populates="product")
-
-    dynamic_attributes: Optional[Dict] = Field(
-        default={},
-        sa_column=Column(JSONB),
-        description="动态扩展属性,使用JSONB存储"
-    )
-    
-    # 对象存储引用(S3路径管理)
-    object_ref: Optional[Dict] = Field(
-        default={},
-        sa_column=Column(JSONB),
-        description="S3对象引用,格式示例:{'main_image':'s3://...','attachments':[...]}"
+    variants: Optional[List[Variant]] = Field(
+        default=None,
+        description="产品变体信息,使用JSONB存储。如:{'color':['黑色','金色'],'size':['1m','3m']}" 
     )
-    
     created_at: Optional[datetime] = Field(default_factory=datetime.now)
     updated_at: Optional[datetime] = Field(default=None)
 
 
-# 产品变体表(一对多关系)
-class ProductVariant(SQLModel, table=True):
-    id: Optional[int] = Field(default=None, primary_key=True)
-    product_id: int = Field(foreign_key="product.id", nullable=False)
-    variant_name: str = Field(max_length=100, description="变体名称(如:黑色1m/金色3m)")
-    variant_type: str = Field(
-        default="color", 
-        max_length=20,
-        description="变体类型:color/size/package"
-    )
-    # 变体属性(动态扩展)
-    attributes: Dict = Field(
-        default={},
-        sa_column=Column(JSONB),
-        description="变体属性(如颜色代码、尺寸规格)"
-    )
-    
-    # 与财务数据的一对一关系
-    financial: Optional["ProductFinancial"] = Relationship(
-        back_populates="variant",
-        sa_relationship_kwargs={"uselist": False}
+async def main():
+    # Beanie uses Motor async client under the hood 
+    client = AsyncIOMotorClient("mongodb://sv-v2:27017")
+    # 不需要传参数据库名,直接通过 client.test 属性名的方式就能连接或创建 test 数据库
+    # 或者通过 client["test"] 的方式连接或创建 test 数据库
+    await init_beanie(database=client["amazone"], document_models=[Product])
+    # await insert_object()
+    product = Product(
+        basic_info=ProductBaseInfo(
+            name="电线保护套",
+            size="直径6MM,长度93-95CM",
+            weight="30G",
+        ),
+        marketing=MarketingInfo(
+            
+        ),
+        competitor_analysis=[
+            CompetitorAnalysis(
+                
+            ) 
+        ],
+        variants=[
+            Variant(),
+        ],
     )
+    print(product.model_dump_json(indent=2))
+    await product.insert()
 
-
-
-# 价格成本专用表(与变体一对一关系)
-class ProductFinancial(SQLModel, table=True):
-    variant_id: int = Field(
-        default=None,
-        foreign_key="productvariant.id",
-        primary_key=True
-    )
-    # 核心财务字段(结构化存储)
-    base_price: float = Field(nullable=False)
-    commission_rate: float = Field()
-    fba_fee: float = Field()
-    cost_rmb: float = Field()
-    logistics_cost: float = Field()
+if __name__ == "__main__":
+    asyncio.run(main())
     
-    # 动态计算字段(JSON存储)
-    calculation_Dict:Optional[Dict] = Field(
-        default={},
-        sa_column=Column(JSONB),
-        description="汇率/利润率等动态计算字段"
-    )
-    # 与变体的反向关系
-    variant: Optional[ProductVariant] = Relationship(back_populates="financial")
-
-# 优化后的产品-竞品关系表
-class ProductCompetitorLink(SQLModel, table=True):
-    product_id: Optional[int] = Field(
-        default=None,
-        foreign_key="product.id",
-        primary_key=True
-    )
-    competitor_id: Optional[int] = Field(
-        default=None,
-        foreign_key="asinseed.id",
-        primary_key=True
-    )
-    relation_type: Optional[str] = Field(
-        default="similar",
-        max_length=20,
-        primary_key=True,
-        description="关联类型:similar/alternative等"
-    )
-    similarity_metrics: Optional[Dict] = Field(
-        default={},
-        sa_column=Column(JSONB),
-        description="相似度指标数据"
-    )
-    weight: float = Field(
-        default=1.0,
-        description="关联权重(用于多竞品对比时加权计算)"
-    )
-
-# 增强版关键词体系
-class ProductKeyword(SQLModel, table=True):
-    id: Optional[int] = Field(default=None, primary_key=True)
-    product_id: int = Field(foreign_key="product.id", nullable=False)
-    keyword_type: str = Field(
-        default="main",
-        max_length=20,
-        description="关键词类型:main/long-tail/related"
-    )
-    language: str = Field(default="ja", max_length=10)
-    keyword: str = Field(index=True, nullable=False)
-    search_volume: Optional[int] = Field()
-    # 关键词关联的变体(可选)
-    variant_id: Optional[int] = Field(
-        foreign_key="productvariant.id",
-        description="特定变体关联的关键词"
-    )

+ 43 - 0
src/models/sql/电线保护套.json

@@ -0,0 +1,43 @@
+{
+    "id": null,
+    "basic_info": {
+      "name": "电线保护套",
+      "content": null,
+      "material": null,
+      "color": null,
+      "size": "直径6MM,长度93-95CM",
+      "packaging_size": null,
+      "weight": "30G",
+      "main_usage": null,
+      "selling_point": null
+    },
+    "marketing": {
+      "title": null,
+      "st_search": null,
+      "selling_point": null,
+      "product_introduction": null
+    },
+    "competitor_analysis": [
+      {
+        "sql_id": null,
+        "asin": null,
+        "asin_area": "JP",
+        "extra_result_path": null,
+        "mhtml_path": null,
+        "error": null,
+        "created_at": "2025-03-23T05:14:50.313438"
+      }
+    ],
+    "variants": [
+      {
+        "name": null,
+        "base_price": null,
+        "commission_rate": null,
+        "fba_fee": null,
+        "cost_rmb": null,
+        "logistics_cost": null
+      }
+    ],
+    "created_at": "2025-03-23T05:14:50.313438",
+    "updated_at": null
+  }

+ 18 - 0
src/tasks/crawl_asin_save_task.py

@@ -1,6 +1,7 @@
 # tasks/save_tasks.py
 from config.celery import app
 from src.browser.crawl_asin import Crawler
+from src.browser.crawl_amz_search_key import CrawlerAmzSearchKey,SearchKeyResult,CrawlerSearchKeyInput
 from config.settings import CFG
 from utils.drission_page import ChromeOptions
 from utils.file import check_exists, save_to_file,s3_uri_to_http_url
@@ -31,6 +32,23 @@ def get_asin_and_save_page(self, asin: str, asin_area: str = 'JP',
         logger.info(f"任务成功: {final_path}")
         return {'status': 'success', 'path': final_path}
         
+    except Exception as e:
+        logger.exception(f"任务失败:{e}")
+        self.retry(exc=e, countdown=60)
+
+@app.task(bind=True)
+def get_amz_search_key_suggestion(self, input_data: CrawlerSearchKeyInput):
+    try:
+        logger.info(f"任务开始: input_data {input_data}")
+        loop = asyncio.get_event_loop()
+
+        # 初始化浏览器配置
+        chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
+        crawler = CrawlerAmzSearchKey(chrome_options=chrome_options)
+        res = loop.run_until_complete(crawler.crawl_suggestion(input_data))
+        logger.info(f"异步任务完成: {res}")
+        return res
+        
     except Exception as e:
         logger.exception(f"任务失败:{e}")
         self.retry(exc=e, countdown=60)

+ 18 - 12
tests/mytest/t_odm_beanie.py

@@ -6,7 +6,6 @@ import asyncio
 from typing import Optional
 
 from motor.motor_asyncio import AsyncIOMotorClient,AsyncIOMotorDatabase
-from pydantic import BaseModel
 
 from beanie import Document, Indexed, init_beanie
 from beanie.odm.operators.update.general import Set
@@ -65,20 +64,27 @@ async def update_one():
     # https://beanie-odm.dev/api-documentation/operators/update/#set
     from beanie.odm.operators.update.general import Set
     # 按条件更新
-    bar = await Product.find(Product.name == '数据线').update(Set({Product.price: 1}))
+    product = await Product.find(Product.name == '数据线').update(Set({Product.price: 1}))
     # 用字典的方式更新也可以
-    bar = await Product.find(Product.name == '数据线').update({"$set": {Product.price: 2}})
+    product = await Product.find(Product.name == '数据线').update({"$set": {Product.price: 2}})
     # 所有文档,只要符合条件 Product.price > .5 都更新
-    bar = await Product.find(Product.price > .5).update(Set({Product.price: 1}))
+    product = await Product.find(Product.price > .5).update(Set({Product.price: 1}))
     # 更新一个
-    bar = await Product.find_one(Product.name == '数据线').update(Set({Product.price: 1}))
+    product = await Product.find_one(Product.name == '数据线').update(Set({Product.price: 1}))
+    # 嵌套更新
+    product.variant.append(Variant(category='电子', description='3M 红色 包裹', name='数据线', price=20))
+    await product.update({'$set': {Product.variant: product.variant}})
+    
+    # 整个文档更新替换。当文档存在 _id 字段值时,会更新整个文档,而不需要指定 update 字段。
+    product:Product  = await Product.find_one(Product.name == '数据线')
+    product.variant.append(Variant(category='电子', description='5M 绿色', name='数据线', price=25))
+    await product.save()
+
     '''
     product:Product  = await Product.find_one(Product.name == '数据线')
-    bar = await product.update(Set({Product.price: 300}))
-    bar.variant.append(Variant(category='电子', description='3M 红色 包裹', name='数据线', price=20))
-    # variant = bar.variant[0] if len(bar.variant) > 0 else None
-    # print(variant)
-    await product.update({'$set': {Product.variant: bar.variant}})
+    product = await product.update(Set({Product.price: 300}))
+    # print(product)
+    await product.save()
 
 async def backup_data(
     source_model: Document,  # 源模型(如 Product)
@@ -105,8 +111,8 @@ async def example():
     # 或者通过 client["test"] 的方式连接或创建 test 数据库
     await init_beanie(database=client["test"], document_models=[Product])
     # await insert_object()
-    # await update_one()
-    await backup_data(Product, client["backup_test"])
+    await update_one()
+    # await backup_data(Product, client["backup_test"])
 
 
 if __name__ == "__main__":