Selaa lähdekoodia

完成模板文件的拷贝,并且单独工作表竞品关键词的写入

mrh 8 kuukautta sitten
vanhempi
sitoutus
4706a631ea

+ 1 - 0
.clinerules

@@ -0,0 +1 @@
+重要:编码遵循高内聚、低耦合、可扩展,符合最佳程序设计,符合最佳实践

+ 6 - 0
docs/gpt/excel_product_sheet.md

@@ -0,0 +1,6 @@
+@/src/excel_tools/excel_writer.py 
+上述文件是完成了一个 excel 文件的其中一个工作表“竞品关键词调研1”。不过,其实文件还有很多工作表 "产品信息2" "产品信息" 。因此我希望新建另一个文件,用于生成各种各样的工作表,当前已经生成了 “竞品关键词调研1” 表,但不要写入文件,而是从模板文件 "G:\code\amazone\copywriting_production\output\resource\文案制作-template.xlsx" 中读取 "产品信息2" "产品信息"  全部信息包括样式。用于新的 excel 文件生成。注意模板文件是只读的。并且不能跨文件复制工作表,因此我觉得最好的做法就是通过 python 复制模板文件,然后打开复制后的文件进行编辑写入。
+
+在 excel_writer.py 中,保存到模板文件工作表命名应该是 “竞品关键词调研1” 。
+你来决定是否要如何新建文件,如何重命名类,如何导入调用。
+请注意,如果遵循 “高内聚、低耦合、可扩展,符合最佳程序设计,符合最佳实践”原则,理论上一个类操作一个工作表对吗?上述文件已经完成了一个工作表的操作,那么 "产品信息"  表应该也是用另一个类来管理对吧?并且要新建一个单独的文件来管理 "产品信息" 表更好对吗?并且 excel_writer.py 这个文件名应该符合 “竞品关键词调研” 这个工作表的文件名表述或者类表述。

+ 11 - 0
src/excel_tools/__init__.py

@@ -0,0 +1,11 @@
+from .file_manager import ExcelFileManager
+from .writers import (
+    CompetitiveAnalysisWriter,
+    ProductInfoWriter
+)
+
+__all__ = [
+    'ExcelFileManager',
+    'CompetitiveAnalysisWriter',
+    'ProductInfoWriter'
+]

+ 1 - 0
src/excel_tools/excel_writer.py

@@ -268,3 +268,4 @@ if __name__ == "__main__":
     
     generator.apply_formatting()
     generator.save()
+

+ 86 - 0
src/excel_tools/file_manager.py

@@ -0,0 +1,86 @@
+import json
+from pathlib import Path
+import shutil
+from typing import Dict, Type, Any
+from openpyxl import load_workbook,Workbook
+from utils.file import read_file
+from src.excel_tools.writers import (
+    ExcelWriterBase,
+    CompetitiveAnalysisWriter,
+    ProductInfoWriter
+)
+from src.manager import DbManager
+from utils.logu import get_logger
+from config.settings import OUTPUT_DIR
+logger = get_logger('excel')
+
+class ExcelFileManager:
+    TEMPLATE_PATH = Path(f"{OUTPUT_DIR}/resource/文案制作-template.xlsx")
+    """Excel文件协调管理器"""
+    def __init__(self, output_path: str=None, template_path: str = None):
+        self.output_path = Path(output_path)
+        self.template_path = template_path or self.TEMPLATE_PATH
+        self.writers: Dict[str, ExcelWriterBase] = {}
+        self.db = DbManager()
+        self.wb:Workbook = self._prepare_workbook()
+        logger.info(f"{self.wb.sheetnames}")
+
+    def _prepare_workbook(self):
+        """准备工作簿"""
+        if not self.output_path.exists():
+            shutil.copy(self.template_path, self.output_path)
+            
+        return load_workbook(self.output_path)
+
+    def save_all(self):
+        self.write_competie_sheet()
+        self.wb.save(self.output_path)
+        self.wb.close()
+    
+    def write_competie_sheet(self, sheet_name: str = "竞品关键词调研", sheet_index: int = 0):
+        if not sheet_name in self.wb.worksheets:
+            extract_data = self.load_s3_extract_data()
+            competitive_sheet_writer = CompetitiveAnalysisWriter(self.wb, sheet_name=sheet_name, sheet_index=sheet_index)
+            competitive_sheet_writer.add_data(extract_data)
+    def load_s3_extract_data(self):
+        list_model = self.db.get_asin_completed()
+        input_data = []
+        for model in list_model:
+            extra_result_data = json.loads(read_file(model.extra_result_path))
+            model_dump = model.model_dump()
+            model_dump['extra_result_data'] = extra_result_data
+            input_data.append(model_dump)
+        # self.add_data('competitive', input_data)
+        return input_data
+
+def main():
+    excel_file = ExcelFileManager(r"G:\code\amazone\copywriting_production\output\resource\multi_data.xlsx")
+    # excel_file.write_competie_sheet()
+    excel_file.save_all()
+    return
+    competi_sheet = CompetitiveAnalysisWriter(excel_file.output_path)
+
+    list_model = excel_file.db.get_asin_completed()
+    competi_sheet.add_data(list_dict)
+    competi_sheet.save()
+    return
+    logger.info(f"{list_dict}")
+    for model in list_dict:
+        json_path = model.extra_result_path
+        asin = model.asin
+        data = json.loads(read_file(json_path))
+    return
+    output_path = r"G:\code\amazone\copywriting_production\output\multi_data.xlsx"
+    
+    generator = CompetitiveAnalysisWriter(output_path)
+    data = json.loads(read_file(json_path))
+    for json_path, asin in json_files:
+        generator.add_product(json_path, asin)
+    
+    generator.apply_formatting()
+    generator.save()
+
+
+
+if __name__ == "__main__":
+    main()

+ 9 - 0
src/excel_tools/writers/__init__.py

@@ -0,0 +1,9 @@
+from .base_writer import ExcelWriterBase
+from .competitive_analysis import CompetitiveAnalysisWriter
+from .product_info import ProductInfoWriter
+
+__all__ = [
+    'ExcelWriterBase',
+    'CompetitiveAnalysisWriter',
+    'ProductInfoWriter'
+]

+ 22 - 0
src/excel_tools/writers/base_writer.py

@@ -0,0 +1,22 @@
+from pathlib import Path
+from typing import Dict, Any
+from openpyxl import load_workbook,Workbook
+import shutil
+
+class ExcelWriterBase:
+    def __init__(self, work_book:Workbook, sheet_index: int=0, sheet_name: str="sheet"):
+        self.sheet_index = sheet_index
+        self.sheet_name = sheet_name
+        self.wb = work_book
+        self.ws = None
+        self._init_worksheet()
+        
+        
+    def _init_worksheet(self):
+        """初始化工作表(由子类实现)"""
+        raise NotImplementedError
+        
+    def add_data(self, data: Dict[str, Any]):
+        """添加数据(由子类实现)"""
+        raise NotImplementedError
+        

+ 254 - 0
src/excel_tools/writers/competitive_analysis.py

@@ -0,0 +1,254 @@
+import base64
+from openpyxl.worksheet.worksheet import Worksheet
+from openpyxl.styles import Font, PatternFill, Alignment
+from openpyxl.utils import get_column_letter
+from pathlib import Path
+from typing import Dict, Any,List
+import json
+import pandas as pd
+from openpyxl.drawing.image import Image
+from openpyxl.formatting.rule import CellIsRule
+from io import BytesIO
+from utils.file import read_file
+from utils.logu import get_logger
+from openpyxl import load_workbook,Workbook
+from .base_writer import ExcelWriterBase
+
+logger = get_logger('excel')
+
+class ProductDataProcessor:
+    """JSON数据处理中心"""
+    def __init__(self, json_data: Dict, asin: str):
+        self.json_data = json_data
+        self.asin = asin
+        self._validate_data()
+        
+    def _validate_data(self):
+        """数据校验"""
+        if 'result_table' not in self.json_data:
+            raise ValueError("Missing required 'result_table' in JSON data")
+
+    def get_sorted_dataframe(self) -> pd.DataFrame:
+        """获取排序后的DataFrame"""
+        df = pd.DataFrame(self.json_data['result_table'])
+        
+        # 数据清洗和类型转换
+        df['monthly_searches'] = df['monthly_searches'].apply(
+            lambda x: int(str(x).replace(',', '')) if x else 0
+        )
+        
+        # 过滤无效数据并排序
+        df = df[df['traffic_keyword'].notna()].sort_values(
+            by='monthly_searches', 
+            ascending=False
+        )
+        return df.reset_index(drop=True)
+
+    @property
+    def product_info(self) -> Dict:
+        """获取产品信息"""
+        return self.json_data.get('product_info', {})
+
+    @property
+    def unique_words(self) -> List[str]:
+        """获取唯一词列表"""
+        return [
+            str(word['word']).strip() 
+            for word in self.json_data.get('unique_words', [])
+            if 'word' in word
+        ]
+
+class CompetitiveAnalysisWriter(ExcelWriterBase):
+    """竞品分析工作表写入器"""
+    COLUMN_SPACING = 3
+    HEADER_FILL = PatternFill(start_color='4F81BD', fill_type='solid')
+    HEADER_FONT = Font(bold=True, color='FFFFFF')
+    RED_FILL = PatternFill(start_color='FF0000',end_color="FF0000", fill_type='solid')  # 修正为RGB格式
+    
+    def __init__(self, work_book:Workbook, sheet_index: int=0, sheet_name: str="竞品关键词调研"):
+        super().__init__(work_book, sheet_index, sheet_name)
+        self.current_col = 1
+        self.product_cols = []
+        self.max_data_rows = 0
+
+    def _init_worksheet(self):
+        if self.sheet_name in self.wb.sheetnames:
+            self.ws = self.wb[self.sheet_name]
+            current_index = self.wb.index(self.ws)
+            offset = self.sheet_index - current_index
+            # 移动工作表
+            self.wb.move_sheet(self.ws, offset=offset)
+        else:
+            self.ws = self.wb.create_sheet(self.sheet_name, index=self.sheet_index)
+            logger.info(f"新建工作表: {self.sheet_name}")
+
+    def add_data(self, data: List[Dict[str, Any]]):
+        for product_data in data:
+            logger.info(f"{product_data['asin']}, 处理中...")
+            self.add_product(product_data['extra_result_data'], product_data['asin'])
+        
+        self.apply_formatting()
+
+    def add_product(self, data: dict, asin: str):
+        """添加产品数据"""
+        try:
+            # 加载并处理数据
+            processor = ProductDataProcessor(data, asin)
+            
+            # 记录产品起始列
+            self.product_cols.append(self.current_col)
+            
+            # 写入主数据表
+            self._write_main_table(processor, asin)
+            
+            # 写入附加信息
+            self._write_additional_info(processor)
+            
+            # 插入产品图片
+            self._insert_product_image(processor.product_info)
+            
+            # 移动到下一组列
+            self.current_col += self.COLUMN_SPACING
+            
+        except (json.JSONDecodeError, ValueError) as e:
+            logger.error(f'Error processing {data}: {e}')
+
+    
+    def _write_main_table(self, processor: ProductDataProcessor, asin: str):
+        """写入主表格数据"""
+        df = processor.get_sorted_dataframe()
+        
+        # 写入表头
+        # 标题行下移到第3行(图片占1-2行)
+        # 标题行调整到第2行
+        asin_cell = self.ws.cell(2, self.current_col, asin)
+        asin_cell.font = Font(bold=True, color='0000FF', underline='single')  # 添加蓝色下划线
+        asin_cell.fill = self.HEADER_FILL
+        asin_cell.alignment = Alignment(horizontal='center', vertical='center')
+        
+        search_volume_cell = self.ws.cell(2, self.current_col + 1, "搜索量")
+        search_volume_cell.font = self.HEADER_FONT
+        search_volume_cell.fill = self.HEADER_FILL
+        search_volume_cell.alignment = Alignment(horizontal='center', vertical='center')
+        
+        # 使用pandas写入数据
+        # 数据从第3行开始(标题行下方直接开始数据)
+        for idx, row in df.iterrows():
+            data_row = idx + 3
+            
+            # 关键词(带超链接)
+            kw_cell = self.ws.cell(data_row, self.current_col, row['traffic_keyword'])
+            if pd.notna(row.get('amazon_search_link')):
+                kw_cell.hyperlink = row['amazon_search_link']
+                kw_cell.font = Font(color='0000FF', underline='single')  # 添加蓝色下划线样式
+            
+            # 搜索量
+            search_cell = self.ws.cell(data_row, self.current_col + 1, int(row['monthly_searches']))
+            search_cell.number_format = 'General'
+            search_cell.value = int(search_cell.value)  # 确保存储为整数类型
+        
+        # 更新最大行数
+        self.max_data_rows = max(self.max_data_rows, len(df) + 2)  # 修正最大行号计算
+        
+        # 设置初始列宽
+        self.ws.column_dimensions[get_column_letter(self.current_col)].width = 35
+        self.ws.column_dimensions[get_column_letter(self.current_col + 1)].width = 15
+
+    def _write_additional_info(self, processor: ProductDataProcessor):
+        """写入附加信息"""
+        start_row = self.max_data_rows + 3  # 间隔3行
+        
+        # 产品信息
+        self.ws.cell(start_row, self.current_col, "产品信息").font = Font(bold=True)
+        # 从product_info提取实际存在的字段
+        info_text = processor.product_info.get('main_text', '')
+        if processor.product_info.get('goto_amazon'):
+            info_text += f"\n产品链接: {processor.product_info['goto_amazon']}"
+        info_cell = self.ws.cell(start_row+1, self.current_col, info_text)
+        info_cell.alignment = Alignment(wrap_text=True, vertical='top')
+        self.ws.column_dimensions[get_column_letter(self.current_col)].width = 35
+        
+        # 唯一词
+        self.ws.cell(start_row+4, self.current_col, "唯一词").font = Font(bold=True)
+        for idx, word in enumerate(processor.unique_words, start=1):
+            self.ws.cell(start_row+4+idx, self.current_col, word)
+
+    def _insert_product_image(self, product_info: Dict):
+        """插入产品图片"""
+        img_base64 = product_info.get('imgbase64')
+        if not img_base64:
+            return
+            
+        try:
+            img_data = base64.b64decode(img_base64)
+            img = Image(BytesIO(img_data))
+            
+            # 图片位置:附加信息上方
+            # 图片插入到第1行(标题之前)
+            img_row = 1
+            img.anchor = f'{get_column_letter(self.current_col)}{img_row}'
+            self.ws.add_image(img)
+            
+            # 调整行高并预留空间
+            self.ws.row_dimensions[img_row].height = 150
+            # 更新最大数据行数(数据从第5行开始)
+            self.max_data_rows = max(self.max_data_rows, 5)
+        except Exception as e:
+            logger.error(f'图片插入失败: {e}')
+
+    def apply_formatting(self):
+        """应用最终格式"""
+        self._apply_conditional_formatting()
+        # self._adjust_column_widths()
+        self._set_global_alignment()
+
+    def _apply_conditional_formatting(self):
+        """应用条件格式"""
+        # 修正颜色定义(使用RGB格式)
+        
+        # 创建条件格式规则(移除字体设置)
+        red_rule = CellIsRule(
+            operator='greaterThan',
+            formula=['10000'],
+            stopIfTrue=True,
+            fill=self.RED_FILL
+        )
+        
+        # 计算目标列字母(B=2, E=5, H=8...)
+        target_columns = []
+        # 使用记录的product_cols计算目标列
+        for start_col in self.product_cols:
+            search_col = start_col + 1  # 搜索量列是起始列+1
+            target_columns.append(get_column_letter(search_col))
+        
+        # 应用条件格式到所有目标列
+        for col_letter in target_columns:
+            cell_range = f"{col_letter}3:{col_letter}{self.max_data_rows}"
+            self.ws.conditional_formatting.add(cell_range, red_rule)
+
+    def _adjust_column_widths(self):
+        """自动调整列宽"""
+        for col in range(1, self.current_col):
+            max_length = 0
+            col_letter = get_column_letter(col)
+            
+            for cell in self.ws[col_letter]:
+                try:
+                    value_length = len(str(cell.value))
+                    if value_length > max_length:
+                        max_length = value_length
+                except:
+                    pass
+                
+            adjusted_width = (max_length + 2) * 1.2
+            self.ws.column_dimensions[col_letter].width = adjusted_width
+
+    def _set_global_alignment(self):
+        """设置全局对齐"""
+        for row in self.ws.iter_rows():
+            for cell in row:
+                cell.alignment = Alignment(
+                    horizontal='left' if cell.column % self.COLUMN_SPACING == 1 else 'center',
+                    vertical='center',
+                    wrap_text=True
+                )

+ 57 - 0
src/excel_tools/writers/product_info.py

@@ -0,0 +1,57 @@
+from openpyxl.worksheet.worksheet import Worksheet
+from openpyxl.styles import Font, PatternFill, Alignment
+from pathlib import Path
+from typing import Dict, Any
+from openpyxl.utils import get_column_letter
+from utils.logu import logger
+
+class ProductInfoWriter():
+    """产品信息工作表写入器"""
+    SHEET_NAME = "产品信息"
+    
+    def _init_worksheet(self):
+        """初始化产品信息工作表"""
+        if self.SHEET_NAME not in self.wb.sheetnames:
+            self.ws = self.wb.create_sheet(title=self.SHEET_NAME)
+            logger.warning(f"新建工作表: {self.SHEET_NAME}")
+        else:
+            self.ws = self.wb[self.SHEET_NAME]
+            self.ws.delete_rows(1, self.ws.max_row)  # 清空现有数据
+            
+        # 初始化表头
+        self._init_headers()
+        
+    def _init_headers(self):
+        """设置固定表头格式"""
+        headers = [
+            ("ASIN", 15),
+            ("产品名称", 30),
+            ("分类节点", 25),
+            ("上架时间", 15),
+            ("评分", 10),
+            ("评论数", 15)
+        ]
+        
+        for col_idx, (header, width) in enumerate(headers, start=1):
+            cell = self.ws.cell(row=1, column=col_idx, value=header)
+            cell.font = Font(bold=True, color="FFFFFF")
+            cell.fill = PatternFill(start_color="4F81BD", fill_type="solid")
+            self.ws.column_dimensions[get_column_letter(col_idx)].width = width
+            
+    def add_data(self, product_data: Dict[str, Any]):
+        """添加产品数据"""
+        # 数据验证
+        required_fields = ['asin', 'title', 'category', 'launch_date', 'rating', 'reviews']
+        if not all(field in product_data for field in required_fields):
+            raise ValueError("Missing required product data fields")
+            
+        # 写入数据行
+        row = self.ws.max_row + 1
+        self.ws.cell(row=row, column=1, value=product_data['asin'])
+        self.ws.cell(row=row, column=2, value=product_data['title'])
+        self.ws.cell(row=row, column=3, value=product_data['category'])
+        self.ws.cell(row=row, column=4, value=product_data['launch_date']).number_format = 'YYYY-MM-DD'
+        self.ws.cell(row=row, column=5, value=product_data['rating']).number_format = '0.0'
+        self.ws.cell(row=row, column=6, value=product_data['reviews']).number_format = '#,##0'
+        
+        logger.info(f"已写入产品数据: {product_data['asin']}")

+ 5 - 0
src/manager/__init__.py

@@ -0,0 +1,5 @@
+from .core.db import DbManager
+
+__all__ = [
+    'DbManager',
+]

+ 14 - 1
src/manager/core/db.py

@@ -29,7 +29,20 @@ class DbManager:
             return exist
         else:
             return self.save_asin_seed(asin_model)
-               
+
+    def get_asin_completed(self, to_dict:bool=False) -> list[AsinSeed]:
+        with Session(self.engine) as session:
+            statement = select(AsinSeed).where(
+                AsinSeed.extra_result_path.is_not(None),
+                AsinSeed.mhtml_path.is_not(None)
+            )
+            results = session.exec(statement)
+            list_model = results.all()
+            if to_dict:
+                return [model.model_dump() for model in list_model]
+            else:
+                return list_model
+                
 def main():
     asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
     db_manager = DbManager()