ソースを参照

完成竞品爬取和竞品表格写入。(因为mongo和AI调整的原因导致结构被修改,因此要重新解析写入)

mrh 11 ヶ月 前
コミット
0091c9e1bb

+ 0 - 172
src/ai/agent_ai_analysis.py

@@ -1,172 +0,0 @@
-import litellm
-from litellm import completion
-from pydantic import BaseModel
-from typing import Any, Dict, Optional
-from src.models.product_model import (
-    Product,
-    AIAnalyzeCompare,
-    AICompetitorAnalyzeMainKeywordsResult,
-    MarketingInfo
-)
-
-class AnalysisExecutor(BaseModel):
-    """分析执行器基类"""
-    product: Product
-    model_provider: str
-    
-    async def prepare_input_data(self) -> Dict:
-        """准备分析输入数据"""
-        raise NotImplementedError
-        
-    async def execute_analysis(self, input_data: Dict) -> Dict:
-        """执行AI分析并返回结构化结果"""
-        raise NotImplementedError
-
-class CompetitorAnalysisExecutor(AnalysisExecutor):
-    """竞品分析执行器"""
-    
-    async def prepare_input_data(self) -> Dict:
-        """组合竞品分析输入数据"""
-        return {
-            "base_info": self.product.basic_info.dict(),
-            "competitors": [
-                c.dict() for c in self.product.competitor_crawl_data.values()
-            ],
-            "extra": self.product.extra_result
-        }
-    
-    async def execute_analysis(self, input_data: Dict) -> Dict:
-        """执行竞品分析"""
-        prompt_template = f"""
-        你是一位专业的亚马逊市场分析师,请根据以下数据生成竞品分析报告:
-        产品基本信息:{input_data['base_info']}
-        竞品数据:{input_data['competitors']}
-        额外数据:{input_data['extra']}
-        
-        请按以下结构输出JSON:
-        {{
-            "main_keywords": [{{"keyword": "...", "reason": "..."}}],
-            "tail_keywords": [{{"keyword": "...", "monthly_searches": ...}}],
-            "comparison_analysis": "..." 
-        }}
-        """
-        
-        response = await completion(
-            model=self.model_provider,
-            messages=[{"role": "user", "content": prompt_template}],
-            response_format={"type": "json_object"}
-        )
-        
-        raw_result = response.choices[0].message.content
-        return AICompetitorAnalyzeMainKeywordsResult.parse_raw(raw_result)
-
-class MarketingAnalysisExecutor(AnalysisExecutor):
-    """营销分析执行器"""
-    
-    async def prepare_input_data(self) -> Dict:
-        """组合营销分析输入数据"""
-        return {
-            "product_info": self.product.basic_info.dict(),
-            "current_marketing": self.product.marketing.dict(),
-            "sales_data": self.product.variants
-        }
-    
-    async def execute_analysis(self, input_data: Dict) -> Dict:
-        """执行营销分析"""
-        prompt_template = f"""
-        根据产品信息生成营销方案:
-        产品信息:{input_data['product_info']}
-        当前营销数据:{input_data['current_marketing']}
-        销售数据:{input_data['sales_data']}
-        
-        输出要求:
-        {{
-            "title": "产品标题",
-            "selling_points": ["卖点1", "卖点2"],
-            "improvement_suggestions": "..." 
-        }}
-        """
-        
-        response = await completion(
-            model=self.model_provider,
-            messages=[{"role": "user", "content": prompt_template}],
-            response_format={"type": "json_object"}
-        )
-        
-        raw_result = response.choices[0].message.content
-        return MarketingInfo.parse_raw(raw_result)
-
-class AnalysisService:
-    """AI分析核心服务"""
-    
-    def __init__(self, product: Product):
-        self.product = product
-        self.snapshot_service = SnapshotService()
-
-    def _get_executor(self, analysis_type: AnalysisType, provider: ModelProvider):
-        """获取执行器实例"""
-        executor_map = {
-            AnalysisType.COMPETITOR: CompetitorAnalysisExecutor,
-            AnalysisType.MARKETING: MarketingAnalysisExecutor
-        }
-        return executor_map[analysis_type](
-            product=self.product,
-            model_provider=provider.value
-        )
-
-    async def execute_analysis(
-        self,
-        analysis_type: AnalysisType,
-        model_provider: ModelProvider,
-        compare_with_previous: bool = True
-    ) -> AIAnalyzeCompare:
-        """执行完整分析流程"""
-        # 1. 准备并执行分析
-        executor = self._get_executor(analysis_type, model_provider)
-        input_data = await executor.prepare_input_data()
-        raw_result = await executor.execute_analysis(input_data)
-
-        # 2. 创建分析快照
-        snapshot = await self.snapshot_service.create_snapshot(
-            product_id=str(self.product.id),
-            analysis_type=analysis_type,
-            model_provider=model_provider,
-            input_data=input_data,
-            output_data=raw_result.dict()
-        )
-
-        # 3. 构建并存储对比结果
-        compare_result = AIAnalyzeCompare(
-            model=model_provider.value,
-            analysis_type=analysis_type,
-            input_data_hash=snapshot.input_hash,
-            output_data_hash=snapshot.output_hash,
-            analysis_result=raw_result
-        )
-
-        # 4. 更新产品记录
-        await self._update_product_record(model_provider, analysis_type, compare_result)
-
-        # 5. 结果对比(如果需要)
-        if compare_with_previous:
-            previous = await self._get_previous_analysis(model_provider, analysis_type)
-            if previous:
-                compare_result.differences = self._compare_results(raw_result, previous)
-
-        return compare_result
-
-    async def _update_product_record(self, provider: ModelProvider, analysis_type: AnalysisType, result: AIAnalyzeCompare):
-        """更新产品分析记录"""
-        update_field = f"ai_analysis_compare.{provider.value}_{analysis_type.value}"
-        await self.product.update({update_field: result.dict()})
-
-    async def _get_previous_analysis(self, provider: ModelProvider, analysis_type: AnalysisType):
-        """获取最近一次同类型分析结果"""
-        field_name = f"{provider.value}_{analysis_type.value}"
-        return self.product.ai_analysis_compare.get(field_name)
-
-    def _compare_results(self, current: BaseModel, previous: BaseModel) -> Dict:
-        """对比两次分析结果差异"""
-        current_dict = current.dict()
-        previous_dict = previous.dict()
-        return DeepDiff(previous_dict, current_dict, ignore_order=True).to_dict()

+ 0 - 408
src/ai/agent_product.py

@@ -1,408 +0,0 @@
-from abc import ABC, abstractmethod
-import json
-from typing import Optional, Set, Union
-from llama_index.core import PromptTemplate
-import asyncio
-import aiofiles
-import os
-import sys
-from dotenv import load_dotenv
-from pydantic import BaseModel
-from src.models.product_model import (
-    AIAnalyzeCompare, Product, CompetitorCrawlData, AICompetitorAnalyzeMainKeywords,
-    TrafficKeywordResult, ProductImageInfo,AICompetitorAnalyzeMainKeywordsResult,
-    SearchAmazoneKeyResult, ProductBaseInfo, Variant,MarketingInfo,
-)
-from src.models.config_model import (UserConfig, AIPromptConfig, )
-from llama_index.llms.openai import OpenAI
-from llama_index.llms.litellm import LiteLLM
-from src.manager.core.db_mongo import BaseMongoManager
-from utils.logu import get_logger
-from src.models.field_config import FieldConfig,get_field_descriptions
-load_dotenv()
-logger = get_logger('ai')
-
-class ConfigManager:
-    _instance = None
-    _config = FieldConfig(
-        include_fields={
-            "ProductImageInfo": {"main_text"},
-            "TrafficKeywordResult": {"traffic_keyword", "monthly_searches"},
-            "ProductBaseInfo": {
-                "name", "content", "material", "color", "size",
-                "packaging_size", "weight", "main_usage", "selling_point"
-            },
-            "CompetitorCrawlData": {"asin"}
-        }
-    )
-
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
-
-    @classmethod
-    def get_field_config(cls) -> FieldConfig:
-        return cls._config
-
-    @classmethod
-    def update_config(cls, new_config: FieldConfig):
-        cls._config = new_config
-
-def get_competitor_prompt_data(
-    product: Product,
-    field_config: FieldConfig = ConfigManager.get_field_config()
-) -> list:
-    """
-    获取竞品提示数据
-    
-    Args:
-        product: 产品对象
-        field_config: 字段配置
-        
-    Returns:
-        结构化竞品数据列表
-    """
-    competitor_crawl_data = product.competitor_crawl_data
-    list_data = []
-    
-    for asin, crawl_data in competitor_crawl_data.items():
-        if crawl_data.extra_result:
-            structured_result = {"asin": asin}
-            
-            if crawl_data.extra_result.product_info:
-                structured_result["product_info"] = field_config.filter_model_dump(
-                    crawl_data.extra_result.product_info,
-                    "ProductImageInfo"
-                )
-                
-            if crawl_data.extra_result.result_table:
-                structured_result["result_table"] = [
-                    field_config.filter_model_dump(item, "TrafficKeywordResult")
-                    for item in crawl_data.extra_result.result_table
-                ]
-                
-            logger.debug(f"Structured result for LLM: {json.dumps(structured_result, indent=4, ensure_ascii=False)}")
-            list_data.append(structured_result)
-            
-    return list_data
-
-class PromptFormatter:
-    """LLM提示词模板格式化器"""
-    def __init__(self, template: str, **kwargs):
-        self.template = template
-        self.kwargs = kwargs
-        self.partial_kwargs = {}
-        self.var_mappings = {}
-        self.function_mappings = {}
-
-    def partial_format(self, **kwargs) -> "PromptFormatter":
-        """部分格式化模板"""
-        self.partial_kwargs.update(kwargs)
-        return self
-
-    def map_variables(self, **mappings) -> "PromptFormatter":
-        """映射模板变量名"""
-        self.var_mappings.update(mappings)
-        return self
-
-    def map_functions(self, **functions) -> "PromptFormatter":
-        """映射模板处理函数"""
-        self.function_mappings.update(functions)
-        return self
-
-    def format(self, **kwargs) -> str:
-        """最终格式化提示词"""
-        # 合并所有参数
-        all_kwargs = {**self.partial_kwargs, **kwargs}
-        
-        # 应用变量名映射
-        mapped_kwargs = {}
-        for key, value in all_kwargs.items():
-            mapped_key = self.var_mappings.get(key, key)
-            mapped_kwargs[mapped_key] = value
-        
-        # 应用函数处理
-        for key, func in self.function_mappings.items():
-            if key in mapped_kwargs:
-                mapped_kwargs[key] = func(**mapped_kwargs)
-        
-        return self.template.format(**mapped_kwargs)
-
-class Formatter(ABC):
-    """格式化器抽象基类"""
-    def __init__(self, notes: Optional[dict] = None):
-        self.notes = notes or {}
-
-    @abstractmethod
-    def format(self, fields_desc: dict) -> str:
-        pass
-
-class JSONFormatter(Formatter):
-    """JSON格式处理器"""
-    def format(self, fields_desc: dict) -> str:
-        json_output = json.dumps(fields_desc, indent=2, ensure_ascii=False)
-        if self.notes.get('json'):
-            return f"```json\n{json_output}\n```\n{self.notes['json']}"
-        return f"```json\n{json_output}\n```"
-
-class HumanFormatter(Formatter):
-    """人类可读格式处理器"""
-    def format(self, fields_desc: dict) -> str:
-        def process_dict(d, indent=0):
-            lines = []
-            for key, value in d.items():
-                prefix = " " * indent
-                if isinstance(value, dict):
-                    lines.append(f"{prefix}{key}:")
-                    lines.append(process_dict(value, indent + 2))
-                else:
-                    lines.append(f"{prefix}{value}: {{{key}}}")
-            return "\n".join(lines)
-
-        result = process_dict(fields_desc)
-        if self.notes.get('human'):
-            result += f"\n{self.notes['human']}"
-        return result
-
-class FormatterFactory:
-    """格式化器工厂类"""
-    @staticmethod
-    def create_formatter(format_type: str, notes: Optional[dict] = None) -> Formatter:
-        if format_type == "json":
-            return JSONFormatter(notes)
-        elif format_type == "human":
-            return HumanFormatter(notes)
-        raise ValueError(f"Unsupported format type: {format_type}")
-
-
-class LLMService:
-    """LLM服务抽象类"""
-    @abstractmethod
-    async def analyze(self, prompt: str) -> Union[dict, str]:
-        pass
-
-class LiteLLMService(LLMService):
-    """LiteLLM实现"""
-    def __init__(self, model: str = "openai/deepseek-chat", max_retries: int = 3,
-                 retry_delay: float = 1.0, format_type: str = "json"):
-        self.model = model
-        self.max_retries = max_retries
-        self.retry_delay = retry_delay
-        self.format_type = format_type
-
-    async def analyze(self, prompt: str) -> Union[dict, str]:
-        llm_kwargs = {}
-        if self.format_type == "json":
-            # if 'deepseek-r' not in self.model:
-                # llm_kwargs["additional_kwargs"] = {"response_format": {"type": "json_object"}}
-            prompt += "\n请确保输出的是有效的JSON对象。"
-        logger.info(f"{self.model} 调用参数: {llm_kwargs}")
-        llm = LiteLLM(model=self.model, **llm_kwargs)
-
-        for attempt in range(self.max_retries):
-            try:
-                logger.info(f"尝试第 {attempt + 1} 次LLM调用...")
-                completion = await llm.acomplete(prompt)
-                return self._process_response(completion.text)
-            except (json.JSONDecodeError, ValueError) as e:
-                if self.format_type == "json":
-                    logger.warning(f"JSON解析失败(尝试 {attempt + 1}/{self.max_retries}): {str(e)}")
-                    if attempt < self.max_retries - 1:
-                        await asyncio.sleep(self.retry_delay)
-                    else:
-                        raise ValueError(f"无法获取有效的JSON响应: {str(e)}")
-            except Exception as e:
-                logger.exception(f"LLM调用失败: {str(e)}")
-                raise
-
-    def _process_response(self, response_text: str) -> Union[dict, str]:
-        if self.format_type == "json":
-            if "```json" in response_text:
-                json_str = response_text.split("```json")[1].split("```")[0].strip()
-            else:
-                json_str = response_text
-
-            result = json.loads(json_str)
-            if not isinstance(result, dict):
-                raise ValueError("响应不是有效的JSON对象")
-
-            logger.debug(f"LLM响应验证通过: {json.dumps(result, indent=2, ensure_ascii=False)}")
-            return result
-        return response_text
-
-class AnalysisService:
-    """分析领域服务"""
-    def __init__(self, llm_service: LLMService, db_manager: BaseMongoManager):
-        self.llm_service:LiteLLMService = llm_service
-        self.db_manager = db_manager
-
-    async def execute_analysis(self, product:Product, format_type: str = "json", dry_run=False, template: Optional[AIPromptConfig] = None) -> tuple[dict, str]:
-        # if template:
-        #     formatter = PromptFormatter(template.template)
-        #     if template.keywords:
-        #         formatter.partial_format(**template.keywords)
-        #     prompt = formatter.format(
-        #         product=product,
-        #         format_type=format_type
-        #     )
-        # else:
-        #     prompt = await self._prepare_prompt(product, format_type)
-            
-        # logger.info(f"prompt: {prompt}")
-        # analysis_result = await self.llm_service.analyze(prompt)
-        # return analysis_result, prompt
-        pass
-
-    async def execute_marketing_analysis(self, product: Product, format_type: str = "json", template: Optional[AIPromptConfig] = None) -> tuple[MarketingInfo, str]:
-        """
-        执行营销文案分析
-        
-        Args:
-            product: 产品对象
-            format_type: 输出格式
-            template: 自定义提示模板
-            
-        Returns:
-            (分析结果, 使用的提示词)
-        """
-        if template:
-            formatter = PromptFormatter(template.template)
-            if template.keywords:
-                formatter.partial_format(**template.keywords)
-            prompt = formatter.format(
-                product=product,
-                format_type=format_type
-            )
-        else:
-            prompt = f'''我是亚马逊运营,请为产品 {product.basic_info.name} 生成营销文案。
-            
-产品信息:
-{product.basic_info.model_dump_json(indent=2)}
-
-要求:
-- 突出产品卖点: {', '.join(product.basic_info.selling_point)}
-- 适合日本市场风格
-- 包含吸引人的标题和详细描述'''
-
-        logger.info(f"营销分析提示词: {prompt}")
-        analysis_result = await self.llm_service.analyze(prompt)
-        
-        try:
-            marketing_info = MarketingInfo(**analysis_result)
-            return marketing_info, prompt
-        except Exception as e:
-            logger.error(f"营销分析结果解析失败: {str(e)}")
-            raise ValueError("营销分析结果格式不正确") from e
-
-    async def _prepare_competitor_prompt(self, product: Product, template: AIPromptConfig) -> str:
-        """使用llmaindex模板方法格式化提示词
-        
-        Args:
-            product: 产品对象
-            template: 提示模板配置
-            
-        Returns:
-            格式化后的提示词字符串
-        """
-        output_fields = get_field_descriptions(
-            AICompetitorAnalyzeMainKeywordsResult,
-            exclude=['results.crawl_result', 'results.created_at']
-            )
-        formatter = FormatterFactory.create_formatter(self.llm_service.format_type)
-        output_format = formatter.format(output_fields)
-
-        competitor_data = get_competitor_prompt_data(product)
-        basic_template =f'''各个字段说明:
-{get_field_descriptions(CompetitorCrawlData, include=['asin'])}
-{get_field_descriptions(ProductImageInfo, include=['main_text'])}
-{get_field_descriptions(TrafficKeywordResult, include=['traffic_keyword', 'monthly_searches'])}
-
-竞品数据:
-{competitor_data}
-
-我的产品信息如下:
-{product.basic_info.model_dump_json(indent=2)}
-
-返回格式:
-{output_format}
-----
-'''
-        template.template = basic_template + template.template
-        return template.template
-
-    @staticmethod
-    def convert_monthly_searches(value):
-        if value is None:
-            return None
-        if isinstance(value, str):
-            if not value.strip():
-                return None
-            return int(value.replace(',', ''))
-        return value
-    async def run_competitor_analysis(self, product: Product,
-        ai_analyze_compare_model: AIAnalyzeCompare,
-        format_type: str = 'json',):
-        prompt = await self._prepare_competitor_prompt(product, ai_analyze_compare_model.competitor_template)
-        logger.info(f"_prepare_competitor_prompt {prompt}")
-        analyze_result = await self.llm_service.analyze(prompt)
-        if 'results' in analyze_result:
-            for result in analyze_result['results']:
-                if 'monthly_searches' in result:
-                    result['monthly_searches'] = self.convert_monthly_searches(result['monthly_searches'])
-        
-        if 'tail_keys' in analyze_result:
-            for tail_key in analyze_result['tail_keys']:
-                if 'monthly_searches' in tail_key:
-                    tail_key['monthly_searches'] = self.convert_monthly_searches(tail_key['monthly_searches'])
-        return analyze_result
-    async def _prepare_prompt(self, product: Product, format_type: str = "json", main_key_num: int = 3, tail_key_num:int = 12) -> str:
-        competitor_data = get_competitor_prompt_data(product)
-        # 从数据模型获取输出字段描述
-        output_fields = get_field_descriptions(
-            AICompetitorAnalyzeMainKeywordsResult,
-            exclude=['results.crawl_result', 'results.created_at']
-            )
-        formatter = FormatterFactory.create_formatter(format_type)
-        output_format = formatter.format(output_fields)
-
-        return f'''各个字段说明:
-{get_field_descriptions(CompetitorCrawlData, include=['asin'])}
-{get_field_descriptions(ProductImageInfo, include=['main_text'])}
-{get_field_descriptions(TrafficKeywordResult, include=['traffic_keyword', 'monthly_searches'])}
-
-竞品数据:
-{competitor_data}
-
-我的产品信息如下:
-{product.basic_info.model_dump_json(indent=2)}
-----
-我是日本站的亚马逊运营,我在给产品名称为 {product.basic_info.name} 选主要关键词和长尾关键词。
-
-请根据以上 {len(competitor_data)} 个竞品数据,按以下规则分析:
-- 选出搜索量在1万以上的相同关键词作为主要关键词{main_key_num}个。
-- 如果竞品的搜索量都不足1万,则从排名前十的关键词中筛选 {main_key_num} 个搜索量最大且相关性最强的词。
-- 结合日本市场特点分析
-- 根据我的产品基本信息,从竞品的主要信息和同类竞品的相似关键词中,筛选出最符合我产品的长尾关键词 tail_keys {tail_key_num} 个以上
-
-筛选长尾词的示例:
-- 假设我的产品是电线保护,那么竞品关键词中,“隐藏排线管” 就不符合长尾词
-- 假设我的产品是“防老化、防动物咬”用途,你就不能在竞品数据中选择不属于我这个使用场景的长尾关键词。
-
-输出格式:
-{output_format}'''
-
-async def main():
-    logger.info(f"base url {os.environ.get('OPENAI_API_BASE')}")
-    db_manager = BaseMongoManager()
-    llm_service = LiteLLMService(format_type='json')
-    analysis_service = AnalysisService(llm_service, db_manager)
-
-    try:
-        result = await analysis_service.execute_analysis("电线保护套")
-        logger.info(f"分析结果: {json.dumps(result, indent=2, ensure_ascii=False)}")
-    except ValueError as e:
-        logger.error(f"分析失败: {str(e)}")
-
-if __name__ == "__main__":
-    asyncio.run(main())

+ 0 - 44
src/ai/run_service.py

@@ -1,44 +0,0 @@
-from src.models.product_model import Product
-from src.models.config_model import UserConfig
-from src.models.ai_execution_record import (
-    CompetitorKeywordAnalysis, MarketingContentGeneration, LLMConfig,
-    AICompetitorAnalyzeMainKeywordsResult,
-    MarketingInfo,)
-from src.manager.core.db_mongo import BaseMongoManager
-from src.ai.ai_executor import TaskRunner
-import asyncio
-from utils.logu import get_logger
-from src.models.field_config import FieldConfig, get_field_descriptions
-from dotenv import load_dotenv
-load_dotenv()
-logger = get_logger('ai')
-
-async def example_usage():
-    """更新后的示例调用代码"""
-    db_manager = BaseMongoManager()
-    await db_manager.initialize()
-    
-    # 获取产品及用户配置
-    product = await Product.find_one(Product.basic_info.name == "电线保护套")
-    llm_config = LLMConfig(model_name="openai/deepseek-chat")
-    
-    # 竞品分析任务
-    competitor_model = CompetitorKeywordAnalysis(
-        product=product, 
-        executor_config=llm_config,
-        prompting=user.prompting.get("竞品和长尾词分析")
-    )
-    await task_runner.run_task(competitor_model)
-    logger.info(f"竞品分析结果: {competitor_model.result}")
-    
-    # 营销文案生成任务
-    marketing_model = MarketingContentGeneration(
-        product=product,
-        executor_config=llm_config,
-        prompting=user.prompting.get("营销文案"), 
-    )
-    await task_runner.run_task(marketing_model)
-    logger.info(f"营销文案结果: {marketing_model.result}")
-
-if __name__ == "__main__":
-    asyncio.run(example_usage())

+ 17 - 8
src/excel_tools/file_manager.py

@@ -14,13 +14,13 @@ from src.manager import DbManager,StorageManager
 from utils.logu import get_logger
 from config.settings import OUTPUT_DIR
 from src.models.asin_model import AsinExtraResultModel
-from src.models.product_model import AICompetitorAnalyzeMainKeywordsResult, Product,CompetitorCrawlData
+from src.models.product_model import Product,CompetitorCrawlData
 from src.manager.core.db_mongo import BaseMongoManager
 
 logger = get_logger('excel')
 
 class ExcelFileManager:
-    TEMPLATE_PATH = Path(f"{OUTPUT_DIR}/resource/文案制作-template.xlsx")
+    TEMPLATE_PATH = Path(f"{OUTPUT_DIR}/resource/大尺寸化妆棉-文案制作模版.xlsx")
     """Excel文件协调管理器"""
     def __init__(self, output_path: str=None, template_path: str = None):
         self.output_path = Path(output_path)
@@ -34,7 +34,16 @@ class ExcelFileManager:
     def _prepare_workbook(self):
         """准备工作簿"""
         if not self.output_path.exists():
-            shutil.copy(self.template_path, self.output_path)
+            # 确保输出目录存在
+            self.output_path.parent.mkdir(parents=True, exist_ok=True)
+            
+            shutil.copy2(self.template_path, self.output_path)
+            # 设置文件为可读写(兼容Windows)
+            try:
+                import os
+                os.chmod(self.output_path, 0o666)
+            except Exception as e:
+                logger.warning(f"设置文件权限失败: {e}")
             
         return load_workbook(self.output_path)
 
@@ -50,7 +59,7 @@ class ExcelFileManager:
             competitive_sheet_writer = CompetitiveAnalysisWriter(self.wb, sheet_name=sheet_name, sheet_index=sheet_index)
             competitive_sheet_writer.add_data(competitor_crawl_data)
 
-    def write_product_info(self, analyze_data: AICompetitorAnalyzeMainKeywordsResult, sheet_name: str = "产品关键词分析", sheet_index: int = 1, overwrite: bool = False):
+    def write_product_info(self, analyze_data, sheet_name: str = "产品关键词分析", sheet_index: int = 1, overwrite: bool = False):
         """写入产品关键词分析数据"""
         if overwrite and sheet_name in self.wb.sheetnames:
             self.wb.remove(self.wb[sheet_name])
@@ -61,15 +70,15 @@ class ExcelFileManager:
         return self.s3_storage_manager.load_s3_complete_extract_data()
 
 async def main():
-    self = ExcelFileManager(r"G:\code\amazone\copywriting_production\output\resource\multi_data.xlsx")
+    self = ExcelFileManager(r"G:\code\amazone\copywriting_production\output\resource\extra-data-大尺寸厚款卸妆棉240片.xlsx")
     db_mongo = BaseMongoManager()  # 自动初始化
     await db_mongo.initialize()
-    product = await Product.find_one(Product.basic_info["name"] == "电线保护套")
+    product = await Product.find_one(Product.basic_info["name"] == "大尺寸厚款卸妆棉240片")
 
     logger.info(f"{product}")
     extract_data_lsit = product.competitor_crawl_data
-    self.write_competitive_sheet(product.competitor_crawl_data)
-    self.write_product_info(product.competitor_analyze,overwrite=True)
+    self.write_competitive_sheet(product.competitor_crawl_data, overwrite=True)
+    # self.write_product_info(product.competitor_analyze,overwrite=True)
     self.save_all()
     return
     competi_sheet = CompetitiveAnalysisWriter(excel_file.output_path)

+ 54 - 45
src/excel_tools/writers/competitive_analysis.py

@@ -13,52 +13,54 @@ from utils.file import read_file
 from utils.logu import get_logger
 from openpyxl import load_workbook,Workbook
 from .base_writer import ExcelWriterBase
-from src.models.product_model import Product,CompetitorCrawlData
+from src.models.product_model import Product,CompetitorCrawlData,ProductImageInfo
 
 
 logger = get_logger('excel')
 
 class ProductDataProcessor:
-    """JSON数据处理中心"""
-    def __init__(self, json_data: Dict, asin: str):
-        self.json_data = json_data
-        self.asin = asin
+    """竞品数据处理中心(适配Pydantic模型版本)"""
+    def __init__(self, crawl_data: CompetitorCrawlData):
+        self.crawl_data = crawl_data
         self._validate_data()
         
     def _validate_data(self):
-        """数据校验"""
-        if 'result_table' not in self.json_data:
-            raise ValueError("Missing required 'result_table' in JSON data")
+        """数据校验(适配Pydantic模型结构)"""
+        if not (self.crawl_data.extra_result and self.crawl_data.extra_result.result_table):
+            raise ValueError(f"竞品数据不完整: ASIN={self.crawl_data.asin} "
+                             f"缺少result_table或extra_result数据")
 
     def get_sorted_dataframe(self) -> pd.DataFrame:
-        """获取排序后的DataFrame"""
-        df = pd.DataFrame(self.json_data['result_table'])
+        """从模型获取排序后的DataFrame"""
+        # 转换Pydantic模型到字典列表
+        result_dicts = [item.dict() for item in self.crawl_data.extra_result.result_table]
+        df = pd.DataFrame(result_dicts)
         
-        # 数据清洗和类型转换
-        df['monthly_searches'] = df['monthly_searches'].apply(
-            lambda x: int(str(x).replace(',', '')) if x else 0
-        )
+        # 增强类型转换处理
+        df['monthly_searches'] = pd.to_numeric(
+            df['monthly_searches'].astype(str).str.replace(',', ''),
+            errors='coerce'
+        ).fillna(0).astype(int)
+        
+        # 处理可能缺失的字段
+        df['amazon_search_link'] = df.get('amazon_search_link', '')
         
         # 过滤无效数据并排序
         df = df[df['traffic_keyword'].notna()].sort_values(
-            by='monthly_searches', 
+            by='monthly_searches',
             ascending=False
         )
         return df.reset_index(drop=True)
 
     @property
-    def product_info(self) -> Dict:
-        """获取产品信息"""
-        return self.json_data.get('product_info', {})
+    def product_info(self) -> ProductImageInfo:
+        """从模型获取产品信息(带空值保护)"""
+        return self.crawl_data.extra_result.product_info if self.crawl_data.extra_result else ProductImageInfo()
 
     @property
     def unique_words(self) -> List[str]:
-        """获取唯一词列表"""
-        return [
-            str(word['word']).strip() 
-            for word in self.json_data.get('unique_words', [])
-            if 'word' in word
-        ]
+        """从模型获取唯一词列表(带空值保护)"""
+        return self.crawl_data.extra_result.unique_words if self.crawl_data.extra_result else []
 
 class CompetitiveAnalysisWriter(ExcelWriterBase):
     """竞品分析工作表写入器"""
@@ -99,15 +101,15 @@ class CompetitiveAnalysisWriter(ExcelWriterBase):
         """
         try:
             # 参数校验
-            if not analysis.extra_result:
-                raise ValueError(f"竞品分析数据缺失: {analysis.asin}")
+            # 增强参数校验
             if not analysis.asin:
-                raise ValueError("竞品ASIN不能为空")
+                raise ValueError(f"无效竞品数据: ASIN为空")
+            if not analysis.extra_result:
+                raise ValueError(f"竞品数据缺失extra_result: ASIN={analysis.asin}")
                 
             # 初始化数据处理
             processor = ProductDataProcessor(
-                json_data=analysis.extra_result,
-                asin=analysis.asin
+                crawl_data=analysis
             )
             
             # 记录产品起始列
@@ -130,8 +132,10 @@ class CompetitiveAnalysisWriter(ExcelWriterBase):
 
     
     def _write_main_table(self, processor: ProductDataProcessor, asin: str):
-        """写入主表格数据"""
+        """写入主表格数据(适配模型版本)"""
         df = processor.get_sorted_dataframe()
+        # 添加空值处理
+        df['amazon_search_link'] = df['amazon_search_link'].fillna('')
         
         # 写入表头
         # 标题行下移到第3行(图片占1-2行)
@@ -153,7 +157,7 @@ class CompetitiveAnalysisWriter(ExcelWriterBase):
             
             # 关键词(带超链接)
             kw_cell = self.ws.cell(data_row, self.current_col, row['traffic_keyword'])
-            if pd.notna(row.get('amazon_search_link')):
+            if pd.notna(row.get('amazon_search_link')) and row['amazon_search_link']:
                 kw_cell.hyperlink = row['amazon_search_link']
                 kw_cell.font = Font(color='0000FF', underline='single')  # 添加蓝色下划线样式
             
@@ -177,10 +181,10 @@ class CompetitiveAnalysisWriter(ExcelWriterBase):
         product_title_cell = self.ws.cell(start_row, self.current_col, "产品信息")
         product_title_cell.font = Font(bold=True)
         # 从product_info提取实际存在的字段
-        info_text = processor.product_info.get('main_text', '')
+        info_text = processor.product_info.main_text if processor.product_info else ''
         info_cell = self.ws.cell(start_row+1, self.current_col, info_text)
-        if processor.product_info.get('goto_amazon'):
-            product_title_cell.hyperlink = processor.product_info['goto_amazon']
+        if processor.product_info and processor.product_info.goto_amazon:
+            product_title_cell.hyperlink = processor.product_info.goto_amazon
             product_title_cell.font = Font(color='0000FF', underline='single')  # 添加超链接样式
         info_cell.alignment = Alignment(wrap_text=True, vertical='top')
         self.ws.column_dimensions[get_column_letter(self.current_col)].width = 35
@@ -190,28 +194,33 @@ class CompetitiveAnalysisWriter(ExcelWriterBase):
         for idx, word in enumerate(processor.unique_words, start=1):
             self.ws.cell(start_row+4+idx, self.current_col, word)
 
-    def _insert_product_image(self, product_info: Dict):
+    def _insert_product_image(self, product_info: ProductImageInfo):
         """插入产品图片"""
-        img_base64 = product_info.get('imgbase64')
-        if not img_base64:
+        if not product_info or not product_info.img_path:
             return
             
         try:
-            img_data = base64.b64decode(img_base64)
-            img = Image(BytesIO(img_data))
+            # 直接读取图片二进制数据
+            img_bytes = read_file(product_info.img_path, mode='rb')
+            if not img_bytes:
+                logger.warning(f'图片数据为空: {product_info.img_path}')
+                return
+                
+            img = Image(BytesIO(img_bytes))
             
-            # 图片位置:附加信息上方
-            # 图片插入到第1行(标题之前)
+            # 设置图片位置(标题行上方)
             img_row = 1
             img.anchor = f'{get_column_letter(self.current_col)}{img_row}'
             self.ws.add_image(img)
             
-            # 调整行高并预留空间
+            # 调整行高适配图片
             self.ws.row_dimensions[img_row].height = 150
-            # 更新最大数据行数(数据从第5行开始)
-            self.max_data_rows = max(self.max_data_rows, 5)
+            self.max_data_rows = max(self.max_data_rows, 5)  # 保持数据行起始位置
+            
         except Exception as e:
-            logger.error(f'图片插入失败: {e}')
+            logger.exception(f'图片插入失败: {e}')
+            # 记录详细错误信息但继续执行
+            logger.debug(f'图片路径: {product_info.img_path}', exc_info=True)
 
     def apply_formatting(self):
         """应用最终格式"""

+ 1 - 2
src/excel_tools/writers/product_info.py

@@ -4,7 +4,6 @@ from openpyxl.utils import get_column_letter
 from typing import Dict, Any, List
 from openpyxl import Workbook
 from .base_writer import ExcelWriterBase
-from src.models.product_model import AICompetitorAnalyzeMainKeywordsResult
 from utils.logu import get_logger
 
 logger = get_logger('excel')
@@ -29,7 +28,7 @@ class ProductInfoWriter(ExcelWriterBase):
             self.ws = self.wb.create_sheet(self.sheet_name, index=self.sheet_index)
             logger.info(f"新建工作表: {self.sheet_name}")
         
-    def add_data(self, analyze_data: AICompetitorAnalyzeMainKeywordsResult):
+    def add_data(self, analyze_data):
         """添加产品分析数据"""
         if not analyze_data or not analyze_data.results:
             logger.warning("无有效分析数据可写入")

+ 2 - 2
src/manager/core/db_mongo.py

@@ -9,7 +9,7 @@ from config.settings import MONGO_URL, MONGO_DB_NAME
 from src.models.product_model import Product
 from beanie.operators import Set, Rename
 from src.models.config_model import UserConfig, AIPromptConfig
-from src.models.ai_execution_record import BaseAIExecution, MarketingContentGeneration
+from src.models.ai_execution_record import BaseAIExecution, MarketingContentGeneration,AgentContent
 from src.models.template_model import Template
 class BaseMongoManager:
     _instance = None
@@ -43,8 +43,8 @@ class BaseMongoManager:
                     document_models=[
                         Product,
                         UserConfig,
-                        BaseAIExecution,
                         Template,
+                        AgentContent,
                     ])
                 self._is_initialized = True
 

+ 29 - 12
src/manager/manager_task.py

@@ -11,11 +11,9 @@ from config.celery import app
 # Remove direct task imports
 from celery.result import AsyncResult
 from src.models.product_model import (
-    Product,CompetitorCrawlData,AICompetitorAnalyzeMainKeywords, 
-    SearchAmazoneKeyResult, ProductBaseInfo, Variant,AICompetitorAnalyzeMainKeywordsResult,
-    AIAnalyzeCompare
+    Product,CompetitorCrawlData, 
+    SearchAmazoneKeyResult, ProductBaseInfo, Variant,
     )
-from src.ai.agent_product import AnalysisService, LiteLLMService
 
 from src.models.config_model import (UserConfig, AIPromptConfig, )
 from src.models.field_config import FieldConfig
@@ -34,6 +32,7 @@ class ManagerTask:
         if model and model.mhtml_path:
             logger.info(f"{asin}已经爬取过,跳过")
             return model
+        model = AsinSeed(asin=asin, asin_area=asin_area)
         """提交任务并等待完成,保存结果路径到数据库"""
         # 提交celery任务
         task = app.send_task('tasks.crawl_asin_save_task.get_asin_and_save_page', args=[asin, asin_area, overwrite])
@@ -95,7 +94,7 @@ class ManagerTask:
             else:
                 self.db.add_or_ignore_asin_seed(AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=task_result['path']))
             return asin_seed
-    async def submit_suggestion_task_and_wait(self, ai_competitor_anlayze_model: AICompetitorAnalyzeMainKeywords, timeout: int = 300):
+    async def submit_suggestion_task_and_wait(self, ai_competitor_anlayze_model, timeout: int = 300):
         """异步提交关键词建议任务并等待完成,保存结果到数据库"""
         # 检查是否已有缓存结果
         if ai_competitor_anlayze_model.crawl_result:
@@ -273,9 +272,9 @@ class ManagerTask:
     async def _run_single_analysis(
         self,
         product: Product,
-        ai_analyze_compare_model: AIAnalyzeCompare,
+        ai_analyze_compare_model,
         format_type: str = 'json',
-    ) -> AIAnalyzeCompare:
+    ):
         """
         执行单个模型的AI分析
         
@@ -314,14 +313,32 @@ async def main():
     return
 async def run_asinseed_task():
     manager = ManagerTask()
-    product = await Product.find_one(Product.basic_info.name == "电线保护套")
-    asinseed_list = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
-
-    for asin in product.competitor_crawl_data.keys():
+    asinseed_list = ['B0BTHX39VZ', 'B081SQRGZP', 'B003UOO8PG', 'B01DNS2FP8', 'B07YQ3BH96']
+    return
+    for asin in asinseed_list:
         logger.info(f"{asin}")
         manager.submit_asinseed_task_and_wait(asin)
         manager.submit_extract_task_and_wait(asin)
     # result = {'status': 'success', 'path': 's3://public/amazone/copywriting_production/output/B0B658JC22/B0B658JC22.mhtml'}
     # manager.save_task_asin_crawl_result('B0B658JC22', 'JP', result)
+
+async def asinseed_to_mongo():
+    manager = ManagerTask()
+    await manager.db_mongo.initialize()
+    # product = await Product.find_one(Product.basic_info.name == "电线保护套")
+    product = Product(basic_info=ProductBaseInfo(name="大尺寸厚款卸妆棉240片"), )
+    asinseed_list = ['B0BTHX39VZ', 'B081SQRGZP', 'B003UOO8PG', 'B01DNS2FP8', 'B07YQ3BH96']
+    for asin in asinseed_list:
+        logger.info(f"{asin}")
+        asin_model = manager.db.get_asin_seed(asin)
+        crawl_data = read_file(asin_model.extra_result_path)
+        product.competitor_crawl_data[asin_model.asin] = CompetitorCrawlData(
+            asin=asin_model.asin, 
+            asin_area=asin_model.asin_area,
+            mhtml_path=asin_model.mhtml_path,
+            extra_result=json.loads(crawl_data)
+            )
+    await product.save()
+
 if __name__ == "__main__":
-    asyncio.run(main())
+    asyncio.run(asinseed_to_mongo())

+ 13 - 9
src/manager/template_manager.py

@@ -10,8 +10,8 @@ import asyncio
 logger = get_logger('template')
 
 class TemplateManager(BaseMongoManager):
-    def __init__(self):
-        super().__init__()
+    def __init__(self, mongo_url: str = None, db_name: str = None):
+        super().__init__(mongo_url=mongo_url, db_name=db_name)
         self.env = Environment(loader=BaseLoader())
         self.env.filters['tojson'] = lambda v: json.dumps(v, default=json_util.default)
 
@@ -43,7 +43,7 @@ class TemplateManager(BaseMongoManager):
         return template
 
     async def create_or_update_template(self, name: str, template_str: str,
-                                     template_type: TemplateType,
+                                     template_type: TemplateType=TemplateType.AGGREGATION,
                                      description: str = None,
                                      collection_name: str = None,
                                      if_exists: str = "update") -> Template:
@@ -131,14 +131,15 @@ class TemplateManager(BaseMongoManager):
         """渲染模板字符串"""
         template = self.env.from_string(template_str)
         rendered = template.render(**context)
-        
         try:
+            # 先尝试JSON解析
             return json.loads(rendered, object_hook=json_util.object_hook)
-        except json.JSONDecodeError as e:
-            logger.error(f"Template rendering failed: {e}\nRendered content:\n{rendered}")
-            raise ValueError(f"Invalid JSON after rendering: {e}")
+        except json.JSONDecodeError:
+            # 如果JSON解析失败,尝试作为Python字面量解析
+            import ast
+            return ast.literal_eval(rendered)
 
-    async def execute_template(self, name: str, context: Dict[str, Any], 
+    async def execute_template(self, name: str, context: Dict[str, Any]={}, 
                              collection_name: str = None):
         """执行模板查询"""
         template = await self.get_template(name)
@@ -146,7 +147,10 @@ class TemplateManager(BaseMongoManager):
             raise ValueError(f"Template '{name}' not found")
             
         pipeline = self.render_template(template.template_str, context)
-        
+        # if context:
+        #     pipeline = self.render_template(template.template_str, context)
+        # else:
+        #     pipeline = json.loads(template.template_str)
         if not collection_name and template.collection_name:
             collection_name = template.collection_name
         elif not collection_name and template.template_type == TemplateType.AGGREGATION:

+ 25 - 10
src/models/ai_execution_record.py

@@ -65,12 +65,12 @@ class AICompetitorAnalyzeMainKeywords(BaseModel):
     )
     created_at:Optional[datetime] = Field(default_factory=datetime.now)
 
+class TailKey(BaseModel):
+    tail_key:str = Field(default=None, description="长尾关键词")
+    monthly_searches:Optional[int] = Field(default=None, description="月搜索量")
+    reason:Optional[str] = Field(default=None, description="选择该长尾关键词原因")
+
 class AICompetitorAnalyzeMainKeywordsResult(BaseModel):
-    class TailKey(BaseModel):
-        tail_key:str = Field(default=None, description="长尾关键词")
-        monthly_searches:Optional[int] = Field(default=None, description="月搜索量")
-        reason:Optional[str] = Field(default=None, description="选择该长尾关键词原因")
-        
     results:Optional[List[AICompetitorAnalyzeMainKeywords]] = []
     supplement:Optional[str] = Field(
         default=None,
@@ -83,6 +83,7 @@ class AICompetitorAnalyzeMainKeywordsResult(BaseModel):
 class CompetitorKeywordAnalysis(BaseAIExecution):
     """竞品关键词分析结果"""
     task_type: str = "competitor_analysis"
+    model_name: str = ""
     result: Optional[AICompetitorAnalyzeMainKeywordsResult] = Field(
         default=None,
         description="竞品关键词分析结果"
@@ -94,7 +95,7 @@ class MarketingInfo(BaseModel):
     st_search: Optional[str] = None
     selling_point: Optional[List[str]] = None
     product_introduction: Optional[str] = None
-
+    reason: Optional[str] = None
 
 
 class MarketingContentGeneration(BaseAIExecution):
@@ -106,7 +107,21 @@ class MarketingContentGeneration(BaseAIExecution):
     )
 
 # 未来扩展 Agent 、 deep searcher 示例
-class AgentMarketingContent(BaseAIExecution):
-    task_type: str = "marketing_generation"
-    result: MarketingInfo
-    executor_config: AgentConfig = AgentConfig(agent_name="marketing_agent",executor_type='qwen/agent_v2')
+class AgentContent(Document):
+    product: Optional[Link["Product"]] = None
+    product_name: Optional[str] = Field(
+        default=None,
+        description="商品名称" 
+    )
+    model_name: str = ""
+    marketing: Optional[MarketingInfo] = Field(
+        default=None,
+        description="生成的营销内容结果"
+    )
+    competitor:Optional[AICompetitorAnalyzeMainKeywordsResult] = Field(
+        default=None,
+        description="竞品关键词分析结果" 
+    )
+    create_time:Optional[datetime] = Field(default_factory=datetime.now)
+    class Settings:
+        name = "agent.product"

+ 10 - 6
src/models/sql/create_user.sql

@@ -1,10 +1,14 @@
 -- 创建用户(仅在用户不存在时创建)
 DO $$
 BEGIN
-  IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'user') THEN
-    CREATE USER "user" WITH PASSWORD 'user';
-  END IF;
+    IF NOT EXISTS (
+        SELECT 1
+        FROM pg_roles
+        WHERE rolname = 'prefect'
+    ) THEN
+        CREATE ROLE prefect WITH LOGIN PASSWORD 'prefect123';
+    END IF;
 END $$;
-
--- 授予用户对数据库 "amazone" 的连接权限
-GRANT CONNECT ON DATABASE copywriting_production TO "user";
+CREATE DATABASE prefect;
+-- 将数据库的所有者设置为用户
+ALTER DATABASE prefect OWNER TO prefect;

+ 3 - 0
src/models/sql/grant_permissions.sql

@@ -1,3 +1,6 @@
+-- 创建数据库
+CREATE DATABASE prefect;
+
 -- 授予 public 模式的权限
 GRANT CREATE, USAGE ON SCHEMA public TO "user";