| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- import litellm
- from litellm import completion
- from pydantic import BaseModel
- from typing import Any, Dict, Optional
- from src.models.product_model import (
- Product,
- AIAnalyzeCompare,
- AICompetitorAnalyzeMainKeywordsResult,
- MarketingInfo
- )
- class AnalysisExecutor(BaseModel):
- """分析执行器基类"""
- product: Product
- model_provider: str
-
- async def prepare_input_data(self) -> Dict:
- """准备分析输入数据"""
- raise NotImplementedError
-
- async def execute_analysis(self, input_data: Dict) -> Dict:
- """执行AI分析并返回结构化结果"""
- raise NotImplementedError
- class CompetitorAnalysisExecutor(AnalysisExecutor):
- """竞品分析执行器"""
-
- async def prepare_input_data(self) -> Dict:
- """组合竞品分析输入数据"""
- return {
- "base_info": self.product.basic_info.dict(),
- "competitors": [
- c.dict() for c in self.product.competitor_crawl_data.values()
- ],
- "extra": self.product.extra_result
- }
-
- async def execute_analysis(self, input_data: Dict) -> Dict:
- """执行竞品分析"""
- prompt_template = f"""
- 你是一位专业的亚马逊市场分析师,请根据以下数据生成竞品分析报告:
- 产品基本信息:{input_data['base_info']}
- 竞品数据:{input_data['competitors']}
- 额外数据:{input_data['extra']}
-
- 请按以下结构输出JSON:
- {{
- "main_keywords": [{{"keyword": "...", "reason": "..."}}],
- "tail_keywords": [{{"keyword": "...", "monthly_searches": ...}}],
- "comparison_analysis": "..."
- }}
- """
-
- response = await completion(
- model=self.model_provider,
- messages=[{"role": "user", "content": prompt_template}],
- response_format={"type": "json_object"}
- )
-
- raw_result = response.choices[0].message.content
- return AICompetitorAnalyzeMainKeywordsResult.parse_raw(raw_result)
- class MarketingAnalysisExecutor(AnalysisExecutor):
- """营销分析执行器"""
-
- async def prepare_input_data(self) -> Dict:
- """组合营销分析输入数据"""
- return {
- "product_info": self.product.basic_info.dict(),
- "current_marketing": self.product.marketing.dict(),
- "sales_data": self.product.variants
- }
-
- async def execute_analysis(self, input_data: Dict) -> Dict:
- """执行营销分析"""
- prompt_template = f"""
- 根据产品信息生成营销方案:
- 产品信息:{input_data['product_info']}
- 当前营销数据:{input_data['current_marketing']}
- 销售数据:{input_data['sales_data']}
-
- 输出要求:
- {{
- "title": "产品标题",
- "selling_points": ["卖点1", "卖点2"],
- "improvement_suggestions": "..."
- }}
- """
-
- response = await completion(
- model=self.model_provider,
- messages=[{"role": "user", "content": prompt_template}],
- response_format={"type": "json_object"}
- )
-
- raw_result = response.choices[0].message.content
- return MarketingInfo.parse_raw(raw_result)
- class AnalysisService:
- """AI分析核心服务"""
-
- def __init__(self, product: Product):
- self.product = product
- self.snapshot_service = SnapshotService()
- def _get_executor(self, analysis_type: AnalysisType, provider: ModelProvider):
- """获取执行器实例"""
- executor_map = {
- AnalysisType.COMPETITOR: CompetitorAnalysisExecutor,
- AnalysisType.MARKETING: MarketingAnalysisExecutor
- }
- return executor_map[analysis_type](
- product=self.product,
- model_provider=provider.value
- )
- async def execute_analysis(
- self,
- analysis_type: AnalysisType,
- model_provider: ModelProvider,
- compare_with_previous: bool = True
- ) -> AIAnalyzeCompare:
- """执行完整分析流程"""
- # 1. 准备并执行分析
- executor = self._get_executor(analysis_type, model_provider)
- input_data = await executor.prepare_input_data()
- raw_result = await executor.execute_analysis(input_data)
- # 2. 创建分析快照
- snapshot = await self.snapshot_service.create_snapshot(
- product_id=str(self.product.id),
- analysis_type=analysis_type,
- model_provider=model_provider,
- input_data=input_data,
- output_data=raw_result.dict()
- )
- # 3. 构建并存储对比结果
- compare_result = AIAnalyzeCompare(
- model=model_provider.value,
- analysis_type=analysis_type,
- input_data_hash=snapshot.input_hash,
- output_data_hash=snapshot.output_hash,
- analysis_result=raw_result
- )
- # 4. 更新产品记录
- await self._update_product_record(model_provider, analysis_type, compare_result)
- # 5. 结果对比(如果需要)
- if compare_with_previous:
- previous = await self._get_previous_analysis(model_provider, analysis_type)
- if previous:
- compare_result.differences = self._compare_results(raw_result, previous)
- return compare_result
- async def _update_product_record(self, provider: ModelProvider, analysis_type: AnalysisType, result: AIAnalyzeCompare):
- """更新产品分析记录"""
- update_field = f"ai_analysis_compare.{provider.value}_{analysis_type.value}"
- await self.product.update({update_field: result.dict()})
- async def _get_previous_analysis(self, provider: ModelProvider, analysis_type: AnalysisType):
- """获取最近一次同类型分析结果"""
- field_name = f"{provider.value}_{analysis_type.value}"
- return self.product.ai_analysis_compare.get(field_name)
- def _compare_results(self, current: BaseModel, previous: BaseModel) -> Dict:
- """对比两次分析结果差异"""
- current_dict = current.dict()
- previous_dict = previous.dict()
- return DeepDiff(previous_dict, current_dict, ignore_order=True).to_dict()
|