Selaa lähdekoodia

新增竞品爬取结果 json 转换为 model ;新增 S3 管理器用来读取数据库的 S3 路径

mrh 8 kuukautta sitten
vanhempi
sitoutus
30a45a012e

+ 14 - 18
src/excel_tools/file_manager.py

@@ -9,9 +9,12 @@ from src.excel_tools.writers import (
     CompetitiveAnalysisWriter,
     ProductInfoWriter
 )
-from src.manager import DbManager
+from src.manager import DbManager,StorageManager
 from utils.logu import get_logger
 from config.settings import OUTPUT_DIR
+from src.models.asin_model import TrafficKeywordModel, ProductInfoModel,AsinExtraResultModel
+
+
 logger = get_logger('excel')
 
 class ExcelFileManager:
@@ -21,7 +24,7 @@ class ExcelFileManager:
         self.output_path = Path(output_path)
         self.template_path = template_path or self.TEMPLATE_PATH
         self.writers: Dict[str, ExcelWriterBase] = {}
-        self.db = DbManager()
+        self.s3_storage_manager = StorageManager()
         self.wb:Workbook = self._prepare_workbook()
         logger.info(f"{self.wb.sheetnames}")
         logger.info(f"{self.wb.worksheets}")
@@ -34,32 +37,25 @@ class ExcelFileManager:
         return load_workbook(self.output_path)
 
     def save_all(self):
-        self.write_competie_sheet()
         self.wb.save(self.output_path)
         self.wb.close()
     
-    def write_competie_sheet(self, sheet_name: str = "竞品关键词调研", sheet_index: int = 0, overwrite: bool = False):
+    def write_competitive_sheet(self, extract_data:dict, sheet_name: str = "竞品关键词调研", sheet_index: int = 0, overwrite: bool = False):
         if overwrite and sheet_name in self.wb.sheetnames:
             self.wb.remove(self.wb[sheet_name])
         if sheet_name not in self.wb.sheetnames:
-            extract_data = self.load_s3_extract_data()
             competitive_sheet_writer = CompetitiveAnalysisWriter(self.wb, sheet_name=sheet_name, sheet_index=sheet_index)
             competitive_sheet_writer.add_data(extract_data)
-    def load_s3_extract_data(self):
-        list_model = self.db.get_asin_completed()
-        input_data = []
-        for model in list_model:
-            extra_result_data = json.loads(read_file(model.extra_result_path))
-            model_dump = model.model_dump()
-            model_dump['extra_result_data'] = extra_result_data
-            input_data.append(model_dump)
-        # self.add_data('competitive', input_data)
-        return input_data
+    def load_s3_extract_data(self) -> list[AsinExtraResultModel]:
+        return self.s3_storage_manager.load_s3_complete_extract_data()
 
 def main():
-    excel_file = ExcelFileManager(r"G:\code\amazone\copywriting_production\output\resource\multi_data.xlsx")
-    # excel_file.write_competie_sheet()
-    excel_file.save_all()
+    self = ExcelFileManager(r"G:\code\amazone\copywriting_production\output\resource\multi_data.xlsx")
+    extract_data_lsit = self.load_s3_extract_data()
+    logger.info(f"{extract_data_lsit}")
+    logger.info(f"{len(extract_data_lsit)}")
+    # self.write_competie_sheet(extract_data)
+    # self.save_all()
     return
     competi_sheet = CompetitiveAnalysisWriter(excel_file.output_path)
 

+ 1 - 0
src/excel_tools/writers/competitive_analysis.py

@@ -13,6 +13,7 @@ from utils.file import read_file
 from utils.logu import get_logger
 from openpyxl import load_workbook,Workbook
 from .base_writer import ExcelWriterBase
+from src.models.asin_model import TrafficKeywordModel, ProductInfoModel
 
 logger = get_logger('excel')
 

+ 2 - 0
src/manager/__init__.py

@@ -1,5 +1,7 @@
 from .core.db import DbManager
+from .core.storage import StorageManager
 
 __all__ = [
     'DbManager',
+    'StorageManager'
 ]

+ 25 - 0
src/manager/core/storage.py

@@ -0,0 +1,25 @@
+import json
+from .db import DbManager
+
+from config.settings import CFG
+from utils.file import save_to_file, read_file
+from src.models.asin_model import AsinSeed, AsinExtraResultModel
+
+class StorageManager:
+    def __init__(self, db_manager: DbManager=None):
+        self.db_manager = db_manager or DbManager()
+
+    def load_s3_complete_extract_data(self) -> list[AsinExtraResultModel]:
+        list_model = self.db_manager.get_asin_completed()
+        input_data = []
+        for model in list_model:
+            extra_result_data = json.loads(read_file(model.extra_result_path))
+            extra_result_model = AsinExtraResultModel.json_to_model(asin=model.asin, asin_area=model.asin_area, extra_result=extra_result_data)
+            input_data.append(extra_result_model)
+        return input_data
+
+    def load_s3_extract_data_by_asin(self, asin: str) -> AsinExtraResultModel:
+        model = self.db_manager.get_asin_seed(asin)
+        extra_result_data = json.loads(read_file(model.extra_result_path))
+        extra_result_model = AsinExtraResultModel.json_to_model(asin=model.asin, asin_area=model.asin_area, extra_result=extra_result_data)
+        return extra_result_model

+ 35 - 3
src/models/asin_model.py

@@ -15,10 +15,9 @@ class AsinSeed(SQLModel, table=True):
     created_at: Optional[datetime] = Field(default_factory=datetime.now)
 
 
-
 class TrafficKeywordModel(BaseModel):
-    traffic_keyword: str
-    monthly_searches: str
+    traffic_keyword: Optional[str] = None
+    monthly_searches: Optional[str] = None
     keyword_link: Optional[str] = None
     amazon_search_link: Optional[str] = None
 
@@ -44,3 +43,36 @@ class ProductInfoModel(BaseModel):
     def main_text_short(self) -> str:
         """Get first 100 characters of main text"""
         return (self.main_text or '')[:100] + '...' if len(self.main_text or '') > 100 else self.main_text
+
+    @classmethod
+    def json_to_model(cls, extra_result: dict) -> 'ProductInfoModel':
+        """Convert JSON data to ProductInfoModel"""
+        unique_words = [
+            str(word['word']).strip() 
+            for word in extra_result.get('unique_words', [])
+            if 'word' in word
+        ]
+        return cls(
+            unique_words=unique_words,  
+            **extra_result['product_info']
+        )
+        
+
+class AsinExtraResultModel(BaseModel):
+    asin: str
+    asin_area: str
+    result_table: List[TrafficKeywordModel]
+    product_info: ProductInfoModel
+
+    @classmethod
+    def json_to_model(cls, asin:str,asin_area:str, extra_result: dict) -> 'AsinExtraResultModel':
+        """Convert JSON data to AsinExtraResultModel"""
+        # product_info = ProductInfoModel(**extra_result['product_info'])
+        # product_info.unique_words = [
+        #     str(word['word']).strip() 
+        #     for word in extra_result.get('unique_words', [])
+        #     if 'word' in word
+        # ]
+        product_info = ProductInfoModel.json_to_model(extra_result)
+        result_table = [TrafficKeywordModel(**item) for item in extra_result['result_table']]   
+        return cls(asin=asin, asin_area=asin_area, result_table=result_table, product_info=product_info)