mrh пре 3 месеци
родитељ
комит
77fa3defdd

+ 11 - 1
src/excel_tools/file_manager.py

@@ -37,7 +37,17 @@ class ExcelFileManager:
             # 确保输出目录存在
             self.output_path.parent.mkdir(parents=True, exist_ok=True)
             
-            shutil.copy2(self.template_path, self.output_path)
+            # 检查模板文件是否存在
+            if self.template_path and self.template_path.exists():
+                shutil.copy2(self.template_path, self.output_path)
+                logger.info(f"使用模板文件: {self.template_path}")
+            else:
+                # 如果模板文件不存在,创建新的空白工作簿
+                from openpyxl import Workbook
+                wb = Workbook()
+                wb.save(self.output_path)
+                logger.info(f"模板文件不存在,创建新的空白工作簿: {self.output_path}")
+            
             # 设置文件为可读写(兼容Windows)
             try:
                 import os

+ 2 - 1
src/excel_tools/writers/competitive_analysis.py

@@ -191,7 +191,8 @@ class CompetitiveAnalysisWriter(ExcelWriterBase):
         
         # 唯一词
         self.ws.cell(start_row+4, self.current_col, "唯一词").font = Font(bold=True)
-        for idx, word in enumerate(processor.unique_words, start=1):
+        unique_words = processor.unique_words or []
+        for idx, word in enumerate(unique_words, start=1):
             self.ws.cell(start_row+4+idx, self.current_col, word)
 
     def _insert_product_image(self, product_info: ProductImageInfo):

+ 11 - 0
src/flow_task/db/product_import_db.py

@@ -154,6 +154,17 @@ class ProductImportManager:
             
         Returns:
             List[ProductImport]: 按asin_exists筛选的记录列表
+            
+            [{'asin_exists': False,
+            'competitor': 'B0020FO356',
+            'created_at': datetime.datetime(2025, 8, 8, 15, 57, 6, 997403),
+            'id': 1,
+            'product_name': '1P双头压刀镊子'},
+            {'asin_exists': False,
+            'competitor': 'B00F8BH8XS',
+            'created_at': datetime.datetime(2025, 8, 8, 15, 57, 6, 997403),
+            'id': 1,
+            'product_name': '1P双头压刀镊子'},...]
         """
         sql_query = "SELECT id, product_name, created_at, competitor, asin_exists FROM public.monthly_product_imports WHERE asin_exists = :asin_exists"
         params = {"asin_exists": asin_exists}

+ 413 - 0
tests/flow_run/t_flow_run_excel_generator.py

@@ -0,0 +1,413 @@
+from datetime import datetime
+from typing import List, Optional
+import asyncio
+from pathlib import Path
+from src.flow_task.db.product_import_db import product_import_manager
+from src.excel_tools.file_manager import ExcelFileManager
+from utils.logu import get_logger
+from utils.file import read_file
+from config.settings import OUTPUT_DIR
+
+logger = get_logger('flow_run_excel_generator')
+
+class ExcelGeneratorFlow:
+    """Excel文件生成流程"""
+    
+    def __init__(self):
+        # 检查多个可能的模板文件位置,优先检查4月份目录
+        possible_paths = [
+            Path(f"{OUTPUT_DIR}/resource/4月份/大尺寸化妆棉-文案制作模版.xlsx"),
+            Path(f"{OUTPUT_DIR}/resource/-文案制作模版.xlsx"),
+            Path(f"{OUTPUT_DIR}/resource/文案大尺寸化妆棉制作-template.xlsx")
+        ]
+        
+        self.default_template_path = None
+        for path in possible_paths:
+            if path.exists():
+                self.default_template_path = path
+                logger.info(f"找到默认模板文件: {path}")
+                break
+        
+        if not self.default_template_path:
+            logger.warning(f"未找到默认模板文件,尝试过的路径: {possible_paths}")
+            # 使用第一个路径作为默认值,让ExcelFileManager处理错误
+            self.default_template_path = possible_paths[0]
+    
+    def download_template_from_uri(self, uri: str, product_name: str) -> Path:
+        """从URI下载模板文件到本地临时目录
+        
+        Args:
+            uri: 模板文件的URI(S3或HTTP)
+            product_name: 产品名称,用于生成唯一的文件名
+            
+        Returns:
+            Path: 下载的模板文件本地路径
+        """
+        try:
+            # 创建临时目录存储下载的模板
+            temp_dir = Path(f"{OUTPUT_DIR}/temp/templates")
+            temp_dir.mkdir(parents=True, exist_ok=True)
+            
+            # 为每个产品生成唯一的模板文件名
+            template_filename = f"template_{product_name}_{Path(uri).name}"
+            local_template_path = temp_dir / template_filename
+            
+            # 如果文件已存在且不为空,直接返回
+            if local_template_path.exists() and local_template_path.stat().st_size > 0:
+                logger.info(f"模板文件已存在: {local_template_path}")
+                return local_template_path
+            
+            logger.info(f"从URI下载模板文件: {uri}")
+            
+            # 使用read_file下载文件(支持S3和本地文件)
+            template_content = read_file(uri, mode='rb')
+            
+            # 保存到本地
+            with open(local_template_path, 'wb') as f:
+                f.write(template_content)
+            
+            logger.info(f"模板文件下载成功: {local_template_path}")
+            return local_template_path
+            
+        except Exception as e:
+            logger.error(f"下载模板文件失败 (URI: {uri}): {e}")
+            # 返回默认模板路径
+            if self.default_template_path:
+                logger.info(f"使用默认模板文件: {self.default_template_path}")
+                return self.default_template_path
+            else:
+                raise Exception(f"无法下载模板文件且无默认模板可用: {uri}")
+    
+    def get_products_from_db(self, month: Optional[int] = None, year: Optional[int] = None) -> List:
+        """从数据库获取产品数据,按产品名称分组
+        
+        Args:
+            month: 月份 (1-12),如果未提供则使用当前月份
+            year: 年份,如果未提供则使用当前年份
+            
+        Returns:
+            List: 产品数据列表,每个产品包含所有ASIN数据
+        """
+        try:
+            # 获取指定月份的产品导入记录,使用新的SQL查询
+            product_imports = self._get_monthly_product_imports_with_asin_data(
+                month=month, 
+                year=year
+            )
+            
+            # 按产品名称分组,将同一个产品的多个ASIN记录合并
+            grouped_products = {}
+            for record in product_imports:
+                product_name = record.get('product_name')
+                if not product_name:
+                    continue
+                    
+                if product_name not in grouped_products:
+                    # 创建产品记录,包含基本信息和ASIN列表
+                    grouped_products[product_name] = {
+                        'product_name': product_name,
+                        'uri': record.get('uri'),
+                        'filename': record.get('filename'),
+                        'created_at': record.get('created_at'),
+                        'competitor': record.get('competitor'),
+                        'asin_exists': record.get('asin_exists'),
+                        'asin_data': []  # 存储该产品的所有ASIN数据
+                    }
+                
+                # 添加ASIN数据(如果存在)
+                if record.get('asin'):
+                    # 检查是否已经存在相同的ASIN,避免重复添加
+                    existing_asins = [item['asin'] for item in grouped_products[product_name]['asin_data']]
+                    if record.get('asin') not in existing_asins:
+                        grouped_products[product_name]['asin_data'].append({
+                            'asin': record.get('asin'),
+                            'extra_result_path': record.get('extra_result_path'),
+                            'mhtml_path': record.get('mhtml_path')
+                        })
+                    else:
+                        logger.warning(f"产品 '{product_name}' 发现重复ASIN: {record.get('asin')},跳过重复记录")
+            
+            # 转换为列表格式
+            result = list(grouped_products.values())
+            logger.info(f"从数据库获取到 {len(result)} 个产品(共 {len(product_imports)} 条ASIN记录)")
+            return result
+            
+        except Exception as e:
+            logger.error(f"从数据库获取产品数据失败: {e}")
+            return []
+    
+    def _get_monthly_product_imports_with_asin_data(self, month: Optional[int] = None, year: Optional[int] = None) -> List[dict]:
+        """从数据库获取产品数据,包含asinseed表的数据
+        
+        Args:
+            month: 月份 (1-12),如果未提供则使用当前月份
+            year: 年份,如果未提供则使用当前年份
+            
+        Returns:
+            List[dict]: 包含asin数据的产品记录列表
+        """
+        from datetime import datetime
+        from sqlmodel import text, Session
+        from config.settings import DB_URL
+        from sqlmodel import create_engine
+        
+        try:
+            # 确定目标月份和年份
+            now = datetime.now()
+            target_month = month if month is not None else now.month
+            target_year = year if year is not None else now.year
+            
+            # 创建数据库引擎
+            engine = create_engine(DB_URL)
+            
+            # 构建SQL查询
+            sql_query = """
+            SELECT 
+                mpi.id, 
+                mpi.product_name, 
+                mpi.uri, 
+                mpi.filename, 
+                mpi.created_at, 
+                mpi.competitor, 
+                mpi.asin_exists, 
+                asinseed.asin, 
+                asinseed.extra_result_path, 
+                asinseed.mhtml_path 
+            FROM 
+                public.monthly_product_imports mpi 
+            LEFT JOIN 
+                public.asinseed 
+            ON 
+                mpi.competitor = asinseed.asin
+            WHERE 
+                date_trunc('month', mpi.created_at) = date_trunc('month', :target_date)
+            """
+            
+            with Session(engine) as session:
+                result = session.exec(text(sql_query).params(target_date=datetime(target_year, target_month, 1)))
+                rows = result.all()
+                
+                # 转换为字典列表
+                product_data = []
+                for row in rows:
+                    product_data.append(dict(row._mapping))
+                
+                logger.info(f"SQL查询返回 {len(product_data)} 条记录")
+                return product_data
+                
+        except Exception as e:
+            logger.error(f"执行SQL查询失败: {e}")
+            return []
+    
+    async def generate_excel_for_product(self, product_data: dict, output_dir: str = None) -> bool:
+        """为单个产品生成Excel文件
+        
+        Args:
+            product_data: 产品数据字典
+            output_dir: 输出目录,默认为None时使用配置的输出目录
+            
+        Returns:
+            bool: 生成成功返回True,失败返回False
+        """
+        product_name = product_data.get('product_name')
+        if not product_name:
+            logger.error(f"产品数据中缺少product_name字段: {product_data}")
+            return False
+        
+        # 设置输出路径
+        if output_dir is None:
+            output_dir = f"{OUTPUT_DIR}/generated_excels"
+        
+        output_path = Path(f"{output_dir}/extra-data-{product_name}.xlsx")
+        
+        # 确保输出目录存在
+        output_path.parent.mkdir(parents=True, exist_ok=True)
+        
+        # 获取模板文件路径
+        template_uri = product_data.get('uri')
+        if template_uri:
+            logger.info(f"使用产品特定的模板URI: {template_uri}")
+            template_path = self.download_template_from_uri(template_uri, product_name)
+        else:
+            logger.info(f"产品未提供模板URI,使用默认模板")
+            template_path = self.default_template_path
+        
+        # 创建Excel文件管理器
+        excel_manager = ExcelFileManager(
+            output_path=str(output_path),
+            template_path=template_path  # 保持为Path对象
+        )
+        
+        # 使用新的方法从PostgreSQL查询结果中构造competitor_crawl_data
+        competitor_crawl_data = await self._build_competitor_crawl_data_from_postgres(product_data)
+        
+        if competitor_crawl_data:
+            # 写入竞品分析数据到Excel
+            excel_manager.write_competitive_sheet(
+                competitor_crawl_data, 
+                sheet_name="竞品关键词调研", 
+                overwrite=True
+            )
+            logger.info(f"成功写入竞品分析数据到Excel")
+        else:
+            logger.warning(f"产品 '{product_name}' 没有竞品数据可写入")
+        
+        # 保存Excel文件
+        excel_manager.save_all()
+        logger.info(f"Excel文件已生成: {output_path}")
+        
+        return True
+            
+    
+    def _parse_product_extraction(self, product_data: dict):
+        """从ProductImport数据中解析ProductForExtraction对象"""
+        try:
+            import json
+            
+            product_data_json = product_data.get('product_data')
+            if not product_data_json:
+                logger.warning(f"产品数据中缺少product_data字段")
+                return None
+            
+            # 解析JSON数据
+            from src.flow_task.db.models.product_models import ProductForExtraction
+            return ProductForExtraction.model_validate_json(product_data_json)
+            
+        except Exception as e:
+            logger.error(f"解析产品提取数据失败: {e}")
+            return None
+    
+    async def _build_competitor_crawl_data_from_postgres(self, product_data: dict):
+        """从PostgreSQL查询结果中构造CompetitorCrawlData数据
+        
+        Args:
+            product_data: 从PostgreSQL查询返回的产品数据字典,包含asin_data列表
+            
+        Returns:
+            Dict[str, CompetitorCrawlData]: 竞品爬取数据字典,包含所有ASIN的数据
+        """
+        try:
+            import json
+            from src.models.product_model import CompetitorCrawlData, ExtraResultModel, TrafficKeywordResult
+            from utils.file import read_file
+            
+            competitor_crawl_data = {}
+            product_name = product_data.get('product_name')
+            
+            # 检查是否有ASIN数据列表
+            asin_data_list = product_data.get('asin_data', [])
+            if not asin_data_list:
+                logger.warning(f"产品 '{product_name}' 没有ASIN数据")
+                return None
+            
+            logger.info(f"产品 '{product_name}' 有 {len(asin_data_list)} 个ASIN需要处理")
+            
+            # 为每个ASIN创建CompetitorCrawlData对象
+            for asin_data in asin_data_list:
+                asin = asin_data.get('asin')
+                if not asin:
+                    logger.warning(f"ASIN数据中缺少asin字段,跳过")
+                    continue
+                
+                # 创建CompetitorCrawlData对象
+                competitor_data = CompetitorCrawlData(
+                    asin=asin,
+                    asin_area="JP",
+                    extra_result_path=asin_data.get('extra_result_path'),
+                    mhtml_path=asin_data.get('mhtml_path')
+                )
+                
+                # 如果有extra_result_path,尝试下载并解析数据
+                extra_result_path = asin_data.get('extra_result_path')
+                if extra_result_path:
+                    try:
+                        logger.info(f"尝试下载ASIN {asin} 的extra_result数据: {extra_result_path}")
+                        # 使用read_file下载文件(支持S3和本地文件)
+                        extra_result_content = read_file(extra_result_path, mode='rb')
+                        
+                        # 尝试解析JSON数据
+                        try:
+                            extra_result_json = json.loads(extra_result_content.decode('utf-8'))
+                            extra_result = ExtraResultModel.model_validate(extra_result_json)
+                            competitor_data.extra_result = extra_result
+                            logger.info(f"成功解析ASIN {asin} 的extra_result数据")
+                        except json.JSONDecodeError as e:
+                            logger.warning(f"解析ASIN {asin} 的extra_result JSON失败: {e}")
+                        except Exception as e:
+                            logger.warning(f"验证ASIN {asin} 的extra_result模型失败: {e}")
+                            
+                    except Exception as e:
+                        logger.warning(f"下载ASIN {asin} 的extra_result数据失败: {e}")
+                
+                # 使用competitor字段作为键名,如果没有则使用asin
+                competitor_key = product_data.get('competitor', asin)
+                # 如果有多个ASIN,使用asin作为键名以避免冲突
+                if len(asin_data_list) > 1:
+                    competitor_key = asin
+                
+                competitor_crawl_data[competitor_key] = competitor_data
+                logger.info(f"成功构造CompetitorCrawlData数据: {competitor_key} -> {asin}")
+            
+            return competitor_crawl_data
+            
+        except Exception as e:
+            logger.error(f"构造CompetitorCrawlData数据失败: {e}")
+            return None
+    
+    async def generate_excels_for_month(self, month: Optional[int] = None, year: Optional[int] = None) -> dict:
+        """为指定月份的所有产品生成Excel文件
+        
+        Args:
+            month: 月份 (1-12),如果未提供则使用当前月份
+            year: 年份,如果未提供则使用当前年份
+            
+        Returns:
+            dict: 包含成功和失败统计的字典
+        """
+        # 获取产品数据
+        products = self.get_products_from_db(month, year)
+        
+        if not products:
+            logger.warning(f"未找到指定月份的产品数据 (month={month}, year={year})")
+            return {"success": 0, "failed": 0, "total": 0}
+        
+        success_count = 0
+        failed_count = 0
+        
+        logger.info(f"开始为 {len(products)} 个产品生成Excel文件")
+        
+        # 为每个产品生成Excel文件
+        for product in products:
+            result = await self.generate_excel_for_product(product)
+            if result:
+                success_count += 1
+            else:
+                failed_count += 1
+        
+        summary = {
+            "success": success_count,
+            "failed": failed_count,
+            "total": len(products)
+        }
+        
+        logger.info(f"Excel生成完成: 成功 {success_count} 个, 失败 {failed_count} 个, 总计 {len(products)} 个")
+        return summary
+
+
+async def main():
+    """主函数 - 运行Excel生成流程"""
+    excel_flow = ExcelGeneratorFlow()
+    
+    # 可以指定月份和年份,如果不指定则使用当前月份
+    # 例如: 生成2025年6月的Excel文件
+    result = await excel_flow.generate_excels_for_month()
+    
+    print(f"\nExcel生成结果:")
+    print(f"成功: {result['success']} 个")
+    print(f"失败: {result['failed']} 个")
+    print(f"总计: {result['total']} 个")
+
+
+if __name__ == "__main__":
+    # 运行主函数
+    asyncio.run(main())