|
|
@@ -0,0 +1,270 @@
|
|
|
+import json
|
|
|
+import pandas as pd
|
|
|
+from openpyxl import Workbook
|
|
|
+from openpyxl.drawing.image import Image
|
|
|
+from openpyxl.formatting.rule import CellIsRule
|
|
|
+from openpyxl.styles import PatternFill, Font, Alignment
|
|
|
+from openpyxl.utils import get_column_letter
|
|
|
+from io import BytesIO
|
|
|
+import base64
|
|
|
+from pathlib import Path
|
|
|
+from typing import Dict, List, Tuple
|
|
|
+from utils.file import read_file
|
|
|
+from utils.logu import logger
|
|
|
+
|
|
|
+# 样式常量
|
|
|
+RED_FILL = PatternFill(start_color='FF0000',end_color="FF0000", fill_type='solid') # 修正为RGB格式
|
|
|
+HEADER_FONT = Font(bold=True, color='FFFFFF')
|
|
|
+HEADER_FILL = PatternFill(start_color='4F81BD', patternType='solid')
|
|
|
+COLUMN_SPACING = 3 # 每个产品占3列(关键词、搜索量、空列)
|
|
|
+# 确保 HEADER_FILL 使用正确的参数
|
|
|
+HEADER_FILL = PatternFill(start_color='4F81BD', fill_type='solid')
|
|
|
+
|
|
|
+class ProductDataProcessor:
|
|
|
+ """JSON数据处理中心"""
|
|
|
+ def __init__(self, json_data: Dict, asin: str):
|
|
|
+ self.json_data = json_data
|
|
|
+ self.asin = asin
|
|
|
+ self._validate_data()
|
|
|
+
|
|
|
+ def _validate_data(self):
|
|
|
+ """数据校验"""
|
|
|
+ if 'result_table' not in self.json_data:
|
|
|
+ raise ValueError("Missing required 'result_table' in JSON data")
|
|
|
+
|
|
|
+ def get_sorted_dataframe(self) -> pd.DataFrame:
|
|
|
+ """获取排序后的DataFrame"""
|
|
|
+ df = pd.DataFrame(self.json_data['result_table'])
|
|
|
+
|
|
|
+ # 数据清洗和类型转换
|
|
|
+ df['monthly_searches'] = df['monthly_searches'].apply(
|
|
|
+ lambda x: int(str(x).replace(',', '')) if x else 0
|
|
|
+ )
|
|
|
+
|
|
|
+ # 过滤无效数据并排序
|
|
|
+ df = df[df['traffic_keyword'].notna()].sort_values(
|
|
|
+ by='monthly_searches',
|
|
|
+ ascending=False
|
|
|
+ )
|
|
|
+ return df.reset_index(drop=True)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def product_info(self) -> Dict:
|
|
|
+ """获取产品信息"""
|
|
|
+ return self.json_data.get('product_info', {})
|
|
|
+
|
|
|
+ @property
|
|
|
+ def unique_words(self) -> List[str]:
|
|
|
+ """获取唯一词列表"""
|
|
|
+ return [
|
|
|
+ str(word['word']).strip()
|
|
|
+ for word in self.json_data.get('unique_words', [])
|
|
|
+ if 'word' in word
|
|
|
+ ]
|
|
|
+
|
|
|
+class ExcelGenerator:
|
|
|
+ """Excel文件生成器"""
|
|
|
+ def __init__(self, output_path: str):
|
|
|
+ self.wb = Workbook()
|
|
|
+ self.ws = self.wb.active
|
|
|
+ self.output_path = Path(output_path)
|
|
|
+ self.current_col = 1
|
|
|
+ self.max_data_rows = 0 # 记录最大数据行数
|
|
|
+ self.product_cols = [] # 记录所有产品起始列
|
|
|
+
|
|
|
+ def add_product(self, json_path: str, asin: str):
|
|
|
+ """添加产品数据"""
|
|
|
+ try:
|
|
|
+ # 加载并处理数据
|
|
|
+ data = json.loads(read_file(json_path))
|
|
|
+ processor = ProductDataProcessor(data, asin)
|
|
|
+
|
|
|
+ # 记录产品起始列
|
|
|
+ self.product_cols.append(self.current_col)
|
|
|
+
|
|
|
+ # 写入主数据表
|
|
|
+ self._write_main_table(processor, asin)
|
|
|
+
|
|
|
+ # 写入附加信息
|
|
|
+ self._write_additional_info(processor)
|
|
|
+
|
|
|
+ # 插入产品图片
|
|
|
+ self._insert_product_image(processor.product_info)
|
|
|
+
|
|
|
+ # 移动到下一组列
|
|
|
+ self.current_col += COLUMN_SPACING
|
|
|
+
|
|
|
+ except (json.JSONDecodeError, ValueError) as e:
|
|
|
+ logger.error(f'Error processing {json_path}: {e}')
|
|
|
+
|
|
|
+
|
|
|
+ def _write_main_table(self, processor: ProductDataProcessor, asin: str):
|
|
|
+ """写入主表格数据"""
|
|
|
+ df = processor.get_sorted_dataframe()
|
|
|
+
|
|
|
+ # 写入表头
|
|
|
+ # 标题行下移到第3行(图片占1-2行)
|
|
|
+ # 标题行调整到第2行
|
|
|
+ asin_cell = self.ws.cell(2, self.current_col, asin)
|
|
|
+ asin_cell.font = Font(bold=True, color='0000FF', underline='single') # 添加蓝色下划线
|
|
|
+ asin_cell.fill = HEADER_FILL
|
|
|
+ asin_cell.alignment = Alignment(horizontal='center', vertical='center')
|
|
|
+
|
|
|
+ search_volume_cell = self.ws.cell(2, self.current_col + 1, "搜索量")
|
|
|
+ search_volume_cell.font = HEADER_FONT
|
|
|
+ search_volume_cell.fill = HEADER_FILL
|
|
|
+ search_volume_cell.alignment = Alignment(horizontal='center', vertical='center')
|
|
|
+
|
|
|
+ # 使用pandas写入数据
|
|
|
+ # 数据从第3行开始(标题行下方直接开始数据)
|
|
|
+ for idx, row in df.iterrows():
|
|
|
+ data_row = idx + 3
|
|
|
+
|
|
|
+ # 关键词(带超链接)
|
|
|
+ kw_cell = self.ws.cell(data_row, self.current_col, row['traffic_keyword'])
|
|
|
+ if pd.notna(row.get('amazon_search_link')):
|
|
|
+ kw_cell.hyperlink = row['amazon_search_link']
|
|
|
+ kw_cell.font = Font(color='0000FF', underline='single') # 添加蓝色下划线样式
|
|
|
+
|
|
|
+ # 搜索量
|
|
|
+ search_cell = self.ws.cell(data_row, self.current_col + 1, int(row['monthly_searches']))
|
|
|
+ search_cell.number_format = 'General'
|
|
|
+ search_cell.value = int(search_cell.value) # 确保存储为整数类型
|
|
|
+
|
|
|
+ # 更新最大行数
|
|
|
+ self.max_data_rows = max(self.max_data_rows, len(df) + 2) # 修正最大行号计算
|
|
|
+
|
|
|
+ # 设置初始列宽
|
|
|
+ self.ws.column_dimensions[get_column_letter(self.current_col)].width = 35
|
|
|
+ self.ws.column_dimensions[get_column_letter(self.current_col + 1)].width = 15
|
|
|
+
|
|
|
+ def _write_additional_info(self, processor: ProductDataProcessor):
|
|
|
+ """写入附加信息"""
|
|
|
+ start_row = self.max_data_rows + 3 # 间隔3行
|
|
|
+
|
|
|
+ # 产品信息
|
|
|
+ self.ws.cell(start_row, self.current_col, "产品信息").font = Font(bold=True)
|
|
|
+ # 从product_info提取实际存在的字段
|
|
|
+ info_text = processor.product_info.get('main_text', '')
|
|
|
+ if processor.product_info.get('goto_amazon'):
|
|
|
+ info_text += f"\n产品链接: {processor.product_info['goto_amazon']}"
|
|
|
+ info_cell = self.ws.cell(start_row+1, self.current_col, info_text)
|
|
|
+ info_cell.alignment = Alignment(wrap_text=True, vertical='top')
|
|
|
+ self.ws.column_dimensions[get_column_letter(self.current_col)].width = 35
|
|
|
+
|
|
|
+ # 唯一词
|
|
|
+ self.ws.cell(start_row+4, self.current_col, "唯一词").font = Font(bold=True)
|
|
|
+ for idx, word in enumerate(processor.unique_words, start=1):
|
|
|
+ self.ws.cell(start_row+4+idx, self.current_col, word)
|
|
|
+
|
|
|
+ def _insert_product_image(self, product_info: Dict):
|
|
|
+ """插入产品图片"""
|
|
|
+ img_base64 = product_info.get('imgbase64')
|
|
|
+ if not img_base64:
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ img_data = base64.b64decode(img_base64)
|
|
|
+ img = Image(BytesIO(img_data))
|
|
|
+
|
|
|
+ # 图片位置:附加信息上方
|
|
|
+ # 图片插入到第1行(标题之前)
|
|
|
+ img_row = 1
|
|
|
+ img.anchor = f'{get_column_letter(self.current_col)}{img_row}'
|
|
|
+ self.ws.add_image(img)
|
|
|
+
|
|
|
+ # 调整行高并预留空间
|
|
|
+ self.ws.row_dimensions[img_row].height = 150
|
|
|
+ # 更新最大数据行数(数据从第5行开始)
|
|
|
+ self.max_data_rows = max(self.max_data_rows, 5)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f'图片插入失败: {e}')
|
|
|
+
|
|
|
+ def apply_formatting(self):
|
|
|
+ """应用最终格式"""
|
|
|
+ self._apply_conditional_formatting()
|
|
|
+ # self._adjust_column_widths()
|
|
|
+ self._set_global_alignment()
|
|
|
+
|
|
|
+ def _apply_conditional_formatting(self):
|
|
|
+ """应用条件格式"""
|
|
|
+ # 修正颜色定义(使用RGB格式)
|
|
|
+
|
|
|
+ # 创建条件格式规则(移除字体设置)
|
|
|
+ red_rule = CellIsRule(
|
|
|
+ operator='greaterThan',
|
|
|
+ formula=['10000'],
|
|
|
+ stopIfTrue=True,
|
|
|
+ fill=RED_FILL
|
|
|
+ )
|
|
|
+
|
|
|
+ # 计算目标列字母(B=2, E=5, H=8...)
|
|
|
+ target_columns = []
|
|
|
+ # 使用记录的product_cols计算目标列
|
|
|
+ for start_col in self.product_cols:
|
|
|
+ search_col = start_col + 1 # 搜索量列是起始列+1
|
|
|
+ target_columns.append(get_column_letter(search_col))
|
|
|
+
|
|
|
+ # 应用条件格式到所有目标列
|
|
|
+ for col_letter in target_columns:
|
|
|
+ cell_range = f"{col_letter}3:{col_letter}{self.max_data_rows}"
|
|
|
+ self.ws.conditional_formatting.add(cell_range, red_rule)
|
|
|
+ logger.info(f"应用条件格式到 {cell_range} (值 > 10000)")
|
|
|
+
|
|
|
+ def _adjust_column_widths(self):
|
|
|
+ """自动调整列宽"""
|
|
|
+ for col in range(1, self.current_col):
|
|
|
+ max_length = 0
|
|
|
+ col_letter = get_column_letter(col)
|
|
|
+
|
|
|
+ for cell in self.ws[col_letter]:
|
|
|
+ try:
|
|
|
+ value_length = len(str(cell.value))
|
|
|
+ if value_length > max_length:
|
|
|
+ max_length = value_length
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+
|
|
|
+ adjusted_width = (max_length + 2) * 1.2
|
|
|
+ self.ws.column_dimensions[col_letter].width = adjusted_width
|
|
|
+
|
|
|
+ def _set_global_alignment(self):
|
|
|
+ """设置全局对齐"""
|
|
|
+ for row in self.ws.iter_rows():
|
|
|
+ for cell in row:
|
|
|
+ cell.alignment = Alignment(
|
|
|
+ horizontal='left' if cell.column % COLUMN_SPACING == 1 else 'center',
|
|
|
+ vertical='center',
|
|
|
+ wrap_text=True
|
|
|
+ )
|
|
|
+
|
|
|
+ def save(self):
|
|
|
+ """保存文件"""
|
|
|
+ try:
|
|
|
+ self.output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
+ self.wb.save(self.output_path)
|
|
|
+ logger.success(f'文件保存成功: {self.output_path}')
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f'文件保存失败: {e}')
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ self.wb.close()
|
|
|
+
|
|
|
+# 使用示例
|
|
|
+if __name__ == "__main__":
|
|
|
+ json_files = [
|
|
|
+ (r"s3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22_extract.json", "B0B658JC22"),
|
|
|
+ (r"s3://public/amazone/copywriting_production/output/B0CQ1SHD8V/B0CQ1SHD8V_extract.json", "B0CQ1SHD8V"),
|
|
|
+ (r"s3://public/amazone/copywriting_production/output/B0DQ84H883/B0DQ84H883_extract.json", "B0DQ84H883"),
|
|
|
+ (r"s3://public/amazone/copywriting_production/output/B0D44RT8R8/B0D44RT8R8_extract.json", "B0D44RT8R8"),
|
|
|
+ ]
|
|
|
+ output_path = r"G:\code\amazone\copywriting_production\output\multi_data.xlsx"
|
|
|
+
|
|
|
+ generator = ExcelGenerator(output_path)
|
|
|
+
|
|
|
+ for json_path, asin in json_files:
|
|
|
+ generator.add_product(json_path, asin)
|
|
|
+
|
|
|
+ generator.apply_formatting()
|
|
|
+ generator.save()
|