Răsfoiți Sursa

完成批量写入 excel

mrh 8 luni în urmă
părinte
comite
ffefa3bf7b

+ 2 - 1
.gitignore

@@ -9,4 +9,5 @@ wheels/
 # Virtual environments
 .venv
 output/
-.env
+.env
+.aider*

+ 5 - 0
.vscode/settings.json

@@ -0,0 +1,5 @@
+{
+    "terminal.integrated.env.windows": {
+    "pythonpath": "${workspaceFolder}/"
+}
+}

+ 1 - 0
docs/gpt/excel_writer_usage.py

@@ -1,4 +1,5 @@
 from src.excel_tools.excel_writer import ExcelWriter
+from src.models.asin_model import TrafficKeywordModel, ProductInfoModel
 import json
 input_path = r'G:\code\amazone\copywriting_production\output\page\debug\B0B658JC22_extract.json'
 

+ 9 - 4
docs/gpt/to_excel.md

@@ -44,20 +44,25 @@
 第一列:标题是来自传参 asin ,例如 "B0CQ1SHD8V" 。每行的内容是来自 result_table 的 "traffic_keyword" 字段。如果可以的话,将每行的内容用超链接链接到 result_table 的 "amazon_search_link" 字段。
 第二列:标题是搜索量 ,每行的内容是来自 result_table 的 "monthly_searches" 字段。
 
-并且在第一列标题附近把 Base64 图片插入到表格中
-
 因为 result_table 是一个表格总和,他们都有相同的行数,他们可能要筛选或者排列,我希望能够按搜索量进行排序从大到小,由于各个列的情况都不同,你来决定是用 python 原生代码排列再写入,还是写入到 excel 中在用excel接口进行排序。而且超过 1万的数值需要标红。
 
 但是 product_info 和 unique_words 是一个单独的字段,如果嵌入到同一个表格中,会影响 table 的排序,你觉得如何构造 excel 表格,放在最后一行可以吗?比如在 result_table 生成的 excel 结果下方新增 product_info 和 unique_words 的行。
 
-请移除掉模板文件,直接生成 excel 即可。
+并且 Base64 图片插入到产品信息行所在的位置附近。
+
 文件中只获取了单个 json 文件,未来需要读取多个 json 文件,写入到表格中,用空列隔开。例如上述案例中,存在2列,然后隔开一列,从第三列新增新的 json 文件,新增各自的 result_table 、 product_info 、 unique_words,因此需要保持代码的兼容性,可扩展性。
 
+目前生成的excel存在问题:
+- 重构不完整
+- 我将 json 文件改成数据模型 DataProcessor DataUtils BaseExcelComponent 还有存在的必要吗?删去,并且不需要校验功能。
+- 不要硬编码。例如 img_data.startswith('/9j/') 这种写法大错特错。
+
+
+
 请重构代码。
 必须符合最佳编码规范,高内聚,低耦合,用类来管理模块。
 
 
-必须读取 "G:\code\amazone\copywriting_production\output\3月新品-文案制作.xlsx" 这个模板文件,将生成的内容另存为通路径下另一个文件。模板文件是只读的。
 
 
 

+ 203 - 128
src/excel_tools/excel_writer.py

@@ -1,5 +1,5 @@
-from dataclasses import dataclass
 import json
+import pandas as pd
 from openpyxl import Workbook
 from openpyxl.drawing.image import Image
 from openpyxl.formatting.rule import CellIsRule
@@ -7,168 +7,243 @@ from openpyxl.styles import PatternFill, Font, Alignment
 from openpyxl.utils import get_column_letter
 from io import BytesIO
 import base64
-from typing import Dict, List, Any
 from pathlib import Path
+from typing import Dict, List, Tuple
 from utils.file import read_file
 from utils.logu import logger
 
-RED_FILL = PatternFill(start_color='FFFF0000', end_color='FFFF0000', fill_type='solid')
+# 样式常量
+RED_FILL = PatternFill(start_color='FFFF0000', fill_type='solid')
 HEADER_FONT = Font(bold=True, color='FFFFFF')
 HEADER_FILL = PatternFill(start_color='4F81BD', patternType='solid')
+COLUMN_SPACING = 3  # 每个产品占3列(关键词、搜索量、空列)
+# 确保 HEADER_FILL 使用正确的参数
+HEADER_FILL = PatternFill(start_color='4F81BD', fill_type='solid')
+
+class ProductDataProcessor:
+    """JSON数据处理中心"""
+    def __init__(self, json_data: Dict, asin: str):
+        self.json_data = json_data
+        self.asin = asin
+        self._validate_data()
+        
+    def _validate_data(self):
+        """数据校验"""
+        if 'result_table' not in self.json_data:
+            raise ValueError("Missing required 'result_table' in JSON data")
+
+    def get_sorted_dataframe(self) -> pd.DataFrame:
+        """获取排序后的DataFrame"""
+        df = pd.DataFrame(self.json_data['result_table'])
+        
+        # 数据清洗和类型转换
+        df['monthly_searches'] = df['monthly_searches'].apply(
+            lambda x: int(str(x).replace(',', '')) if x else 0
+        )
+        
+        # 过滤无效数据并排序
+        df = df[df['traffic_keyword'].notna()].sort_values(
+            by='monthly_searches', 
+            ascending=False
+        )
+        return df.reset_index(drop=True)
 
-@dataclass
-class DataProcessor:
-    """处理单个JSON文件的数据转换"""
-    json_data: Dict[str, Any]
-    asin: str
-    
-    def process_table_data(self) -> List[Dict[str, Any]]:
-        """处理表格主体数据"""
-        return [
-            {
-                'traffic_keyword': item.get('traffic_keyword', ''),
-                'amazon_search_link': item.get('amazon_search_link', ''),
-                'monthly_searches': item.get('monthly_searches', '0').replace(',', '')
-            }
-            for item in self.json_data.get('result_table', [])
-            if item.get('traffic_keyword')
-        ]
-    
     @property
-    def product_info(self) -> Dict[str, str]:
-        """提取产品基础信息"""
+    def product_info(self) -> Dict:
+        """获取产品信息"""
         return self.json_data.get('product_info', {})
-    
+
     @property
     def unique_words(self) -> List[str]:
-        """提取唯一词列表"""
-        return [word.get('word', '') for word in self.json_data.get('unique_words', [])]
+        """获取唯一词列表"""
+        return [
+            str(word['word']).strip() 
+            for word in self.json_data.get('unique_words', [])
+            if 'word' in word
+        ]
 
-class ExcelWriter:
-    """Excel文件写入器"""
-    
+class ExcelGenerator:
+    """Excel文件生成器"""
     def __init__(self, output_path: str):
         self.wb = Workbook()
         self.ws = self.wb.active
         self.output_path = Path(output_path)
-        self.current_col = 1  # 当前写入列位置
+        self.current_col = 1
+        self.max_data_rows = 0  # 记录最大数据行数
         
-    def add_json_data(self, json_path: str, asin: str):
-        """添加单个JSON文件数据"""
-        str_data = read_file(json_path)
-        data = json.loads(str_data)
-        processor = DataProcessor(data, asin)
+    def add_product(self, json_path: str, asin: str):
+        """添加产品数据"""
+        try:
+            # 加载并处理数据
+            data = json.loads(read_file(json_path))
+            processor = ProductDataProcessor(data, asin)
+            
+            # 写入主数据表
+            self._write_main_table(processor, asin)
+            
+            # 写入附加信息
+            self._write_additional_info(processor)
+            
+            # 插入产品图片
+            self._insert_product_image(processor.product_info)
+            
+            # 移动到下一组列
+            self.current_col += COLUMN_SPACING
+            
+        except (json.JSONDecodeError, ValueError) as e:
+            logger.error(f'Error processing {json_path}: {e}')
+
+    
+    def _write_main_table(self, processor: ProductDataProcessor, asin: str):
+        """写入主表格数据"""
+        df = processor.get_sorted_dataframe()
         
         # 写入表头
-        self._write_header(processor)
+        asin_cell = self.ws.cell(1, self.current_col, asin)
+        asin_cell.font = HEADER_FONT
+        asin_cell.fill = HEADER_FILL
+        asin_cell.alignment = Alignment(horizontal='center', vertical='center')
         
-        # 写入表格数据
-        self._write_table_data(processor)
+        search_volume_cell = self.ws.cell(1, self.current_col + 1, "搜索量")
+        search_volume_cell.font = HEADER_FONT
+        search_volume_cell.fill = HEADER_FILL
+        search_volume_cell.alignment = Alignment(horizontal='center', vertical='center')
         
-        # 写入附加信息
-        self._write_additional_info(processor)
-        
-        self.current_col += 3  # 数据列+间隔列
-    
-    def _write_header(self, processor: DataProcessor):
-        """写入表头(含图片)"""
-        # ASIN标题
-        header_cell = self.ws.cell(
-            row=1,
-            column=self.current_col,
-            value=processor.asin
-        )
-        header_cell.font = HEADER_FONT
-        header_cell.fill = HEADER_FILL
-        
-        # 插入Base64图片
-        img_data = processor.product_info.get('imgbase64', '')
-        if img_data and img_data.startswith('/9j/'):
-            try:
-                img = Image(BytesIO(base64.b64decode(img_data)))
-                img.anchor = f'{get_column_letter(self.current_col)}2'
-                self.ws.add_image(img)
-            except Exception as e:
-                logger.error(f'图片插入失败: {e}')
-                
-        # 搜索量标题
-        self.ws.cell(
-            row=1,
-            column=self.current_col + 1,
-            value='搜索量'
-        ).font = HEADER_FONT
-    
-    def _write_table_data(self, processor: DataProcessor):
-        """写入表格主体数据"""
-        for row_idx, item in enumerate(processor.process_table_data(), start=3):
-            # 关键词超链接
-            self.ws.cell(
-                row=row_idx,
-                column=self.current_col,
-                value=item['traffic_keyword']
-            ).hyperlink = item['amazon_search_link']
+        # 使用pandas写入数据
+        for idx, row in df.iterrows():
+            data_row = idx + 2  # Excel行号从2开始
             
-            # 搜索量数值处理
-            search_volume = item['monthly_searches']
-            cell = self.ws.cell(
-                row=row_idx,
-                column=self.current_col + 1,
-                value=int(search_volume) if search_volume.isdigit() else 0
-            )
+            # 关键词(带超链接)
+            kw_cell = self.ws.cell(data_row, self.current_col, row['traffic_keyword'])
+            if pd.notna(row.get('amazon_search_link')):
+                kw_cell.hyperlink = row['amazon_search_link']
             
-            # 条件格式设置(超过1万标红)
-            if cell.value > 10000:
-                cell.fill = RED_FILL
-    
-    def _write_additional_info(self, processor: DataProcessor):
-        """写入附加信息到表格下方"""
-        max_row = self.ws.max_row
-        base_row = max_row + 2
+            # 搜索量
+            search_cell = self.ws.cell(data_row, self.current_col + 1, row['monthly_searches'])
+            search_cell.number_format = '#,##0'
+        
+        # 更新最大行数
+        self.max_data_rows = max(self.max_data_rows, len(df) + 1)
+        
+        # 设置初始列宽
+        self.ws.column_dimensions[get_column_letter(self.current_col)].width = 35
+        self.ws.column_dimensions[get_column_letter(self.current_col + 1)].width = 15
+
+    def _write_additional_info(self, processor: ProductDataProcessor):
+        """写入附加信息"""
+        start_row = self.max_data_rows + 3  # 间隔3行
         
         # 产品信息
-        self.ws.cell(base_row, self.current_col, '产品信息:').font = Font(bold=True)
-        self.ws.cell(base_row + 1, self.current_col, processor.product_info.get('main_text', ''))
+        self.ws.cell(start_row, self.current_col, "产品信息").font = Font(bold=True)
+        info_text = "\n".join([
+            f"标题: {processor.product_info.get('title', '')}",
+            f"评分: {processor.product_info.get('stars', '')}",
+            f"价格: {processor.product_info.get('price', '')}"
+        ])
+        info_cell = self.ws.cell(start_row+1, self.current_col, info_text)
+        info_cell.alignment = Alignment(wrap_text=True, vertical='top')
         
-        # 唯一词列表
-        self.ws.cell(base_row + 3, self.current_col, '唯一词:').font = Font(bold=True)
+        # 唯一词
+        self.ws.cell(start_row+4, self.current_col, "唯一词").font = Font(bold=True)
         for idx, word in enumerate(processor.unique_words, start=1):
-            self.ws.cell(base_row + 3 + idx, self.current_col, word)
+            self.ws.cell(start_row+4+idx, self.current_col, word)
+
+    def _insert_product_image(self, product_info: Dict):
+        """插入产品图片"""
+        img_base64 = product_info.get('imgbase64')
+        if not img_base64:
+            return
+            
+        try:
+            img_data = base64.b64decode(img_base64)
+            img = Image(BytesIO(img_data))
+            
+            # 图片位置:附加信息上方
+            img_row = self.max_data_rows + 2
+            img.anchor = f'{get_column_letter(self.current_col)}{img_row}'
+            self.ws.add_image(img)
+            
+            # 调整行高
+            self.ws.row_dimensions[img_row].height = 150
+        except Exception as e:
+            logger.error(f'图片插入失败: {e}')
+
+    def apply_formatting(self):
+        """应用最终格式"""
+        self._apply_conditional_formatting()
+        self._adjust_column_widths()
+        self._set_global_alignment()
+
+    def _apply_conditional_formatting(self):
+        """应用条件格式"""
+        red_rule = CellIsRule(
+            operator='greaterThan',
+            formula=['10000'],
+            fill=RED_FILL
+        )
         
-    def apply_styles(self):
-        """应用全局样式"""
-        # 设置列宽自适应
-        for col in self.ws.columns:
-            max_length = max(
-                len(str(cell.value)) for cell in col
-                if cell.value is not None
+        # 每隔一列应用条件格式
+        for col in range(2, self.current_col, COLUMN_SPACING):
+            col_letter = get_column_letter(col)
+            self.ws.conditional_formatting.add(
+                f'{col_letter}2:{col_letter}{self.max_data_rows}',
+                red_rule
             )
-            self.ws.column_dimensions[get_column_letter(col[0].column)].width = max_length + 2
+
+    def _adjust_column_widths(self):
+        """自动调整列宽"""
+        for col in range(1, self.current_col):
+            max_length = 0
+            col_letter = get_column_letter(col)
             
-        # 设置标题对齐
-        for row in self.ws.iter_rows(min_row=1, max_row=1):
+            for cell in self.ws[col_letter]:
+                try:
+                    value_length = len(str(cell.value))
+                    if value_length > max_length:
+                        max_length = value_length
+                except:
+                    pass
+                
+            adjusted_width = (max_length + 2) * 1.2
+            self.ws.column_dimensions[col_letter].width = adjusted_width
+
+    def _set_global_alignment(self):
+        """设置全局对齐"""
+        for row in self.ws.iter_rows():
             for cell in row:
-                cell.alignment = Alignment(horizontal='center')
-    
+                cell.alignment = Alignment(
+                    horizontal='left' if cell.column % COLUMN_SPACING == 1 else 'center',
+                    vertical='center',
+                    wrap_text=True
+                )
+
     def save(self):
         """保存文件"""
-        self.output_path.parent.mkdir(parents=True, exist_ok=True)
-        self.wb.save(self.output_path)
-        logger.info(f'Excel文件已保存至: {self.output_path}')
+        try:
+            self.output_path.parent.mkdir(parents=True, exist_ok=True)
+            self.wb.save(self.output_path)
+            logger.success(f'文件保存成功: {self.output_path}')
+            return True
+        except Exception as e:
+            logger.error(f'文件保存失败: {e}')
+            return False
+        finally:
+            self.wb.close()
 
-def main():
-    output = r"G:\code\amazone\copywriting_production\output\multi_data.xlsx"
+# 使用示例
+if __name__ == "__main__":
     json_files = [
-        (r"G:\code\amazone\copywriting_production\output\page\debug\B0B658JC22_extract.json", "B0B658JC22"),
-        # 添加更多文件示例
-        # (r"path\to\other.json", "ASIN123")
+        (r"s3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22_extract.json", "B0B658JC22"),
+        (r"s3://public/amazone/copywriting_production/output/B0CQ1SHD8V/B0CQ1SHD8V_extract.json", "B0CQ1SHD8V"),
+        (r"s3://public/amazone/copywriting_production/output/B0DQ84H883/B0DQ84H883_extract.json", "B0DQ84H883"),
+        (r"s3://public/amazone/copywriting_production/output/B0D44RT8R8/B0D44RT8R8_extract.json", "B0D44RT8R8"),
     ]
-    logger.info(f"{json_files}")
-    writer = ExcelWriter(output)
+    output_path = r"G:\code\amazone\copywriting_production\output\multi_data.xlsx"
+    
+    generator = ExcelGenerator(output_path)
+    
     for json_path, asin in json_files:
-        writer.add_json_data(json_path, asin)
+        generator.add_product(json_path, asin)
     
-    writer.apply_styles()
-    writer.save()
-
-if __name__ == "__main__":
-    main()
+    generator.apply_formatting()
+    generator.save()

+ 33 - 0
src/models/asin_model.py

@@ -2,6 +2,8 @@ from datetime import datetime
 from typing import Optional
 from sqlmodel import SQLModel, create_engine, Session, select, Field
 from config.settings import DB_URL
+from typing import List, Optional
+from pydantic import BaseModel
 
 class AsinSeed(SQLModel, table=True):
     id: Optional[int] = Field(default=None, primary_key=True)
@@ -11,3 +13,34 @@ class AsinSeed(SQLModel, table=True):
     mhtml_path: Optional[str] = None
     error: Optional[str] = None
     created_at: Optional[datetime] = Field(default_factory=datetime.now)
+
+
+
+class TrafficKeywordModel(BaseModel):
+    traffic_keyword: str
+    monthly_searches: str
+    keyword_link: Optional[str] = None
+    amazon_search_link: Optional[str] = None
+
+    @property
+    def monthly_searches_int(self) -> int:
+        """Convert monthly searches to integer"""
+        try:
+            return int(str(self.monthly_searches).strip().replace(',', '')) if self.monthly_searches else 0
+        except ValueError:
+            return 0
+
+
+
+class ProductInfoModel(BaseModel):
+    image_url: Optional[str] = None
+    goto_amazon: Optional[str] = None
+    main_text: Optional[str] = None
+    imgbase64: Optional[str] = None
+    unique_words: Optional[List[str]] = None
+
+
+    @property
+    def main_text_short(self) -> str:
+        """Get first 100 characters of main text"""
+        return (self.main_text or '')[:100] + '...' if len(self.main_text or '') > 100 else self.main_text