agent_ai_analysis.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import litellm
  2. from litellm import completion
  3. from pydantic import BaseModel
  4. from typing import Any, Dict, Optional
  5. from src.models.product_model import (
  6. Product,
  7. AIAnalyzeCompare,
  8. AICompetitorAnalyzeMainKeywordsResult,
  9. MarketingInfo
  10. )
  11. class AnalysisExecutor(BaseModel):
  12. """分析执行器基类"""
  13. product: Product
  14. model_provider: str
  15. async def prepare_input_data(self) -> Dict:
  16. """准备分析输入数据"""
  17. raise NotImplementedError
  18. async def execute_analysis(self, input_data: Dict) -> Dict:
  19. """执行AI分析并返回结构化结果"""
  20. raise NotImplementedError
  21. class CompetitorAnalysisExecutor(AnalysisExecutor):
  22. """竞品分析执行器"""
  23. async def prepare_input_data(self) -> Dict:
  24. """组合竞品分析输入数据"""
  25. return {
  26. "base_info": self.product.basic_info.dict(),
  27. "competitors": [
  28. c.dict() for c in self.product.competitor_crawl_data.values()
  29. ],
  30. "extra": self.product.extra_result
  31. }
  32. async def execute_analysis(self, input_data: Dict) -> Dict:
  33. """执行竞品分析"""
  34. prompt_template = f"""
  35. 你是一位专业的亚马逊市场分析师,请根据以下数据生成竞品分析报告:
  36. 产品基本信息:{input_data['base_info']}
  37. 竞品数据:{input_data['competitors']}
  38. 额外数据:{input_data['extra']}
  39. 请按以下结构输出JSON:
  40. {{
  41. "main_keywords": [{{"keyword": "...", "reason": "..."}}],
  42. "tail_keywords": [{{"keyword": "...", "monthly_searches": ...}}],
  43. "comparison_analysis": "..."
  44. }}
  45. """
  46. response = await completion(
  47. model=self.model_provider,
  48. messages=[{"role": "user", "content": prompt_template}],
  49. response_format={"type": "json_object"}
  50. )
  51. raw_result = response.choices[0].message.content
  52. return AICompetitorAnalyzeMainKeywordsResult.parse_raw(raw_result)
  53. class MarketingAnalysisExecutor(AnalysisExecutor):
  54. """营销分析执行器"""
  55. async def prepare_input_data(self) -> Dict:
  56. """组合营销分析输入数据"""
  57. return {
  58. "product_info": self.product.basic_info.dict(),
  59. "current_marketing": self.product.marketing.dict(),
  60. "sales_data": self.product.variants
  61. }
  62. async def execute_analysis(self, input_data: Dict) -> Dict:
  63. """执行营销分析"""
  64. prompt_template = f"""
  65. 根据产品信息生成营销方案:
  66. 产品信息:{input_data['product_info']}
  67. 当前营销数据:{input_data['current_marketing']}
  68. 销售数据:{input_data['sales_data']}
  69. 输出要求:
  70. {{
  71. "title": "产品标题",
  72. "selling_points": ["卖点1", "卖点2"],
  73. "improvement_suggestions": "..."
  74. }}
  75. """
  76. response = await completion(
  77. model=self.model_provider,
  78. messages=[{"role": "user", "content": prompt_template}],
  79. response_format={"type": "json_object"}
  80. )
  81. raw_result = response.choices[0].message.content
  82. return MarketingInfo.parse_raw(raw_result)
  83. class AnalysisService:
  84. """AI分析核心服务"""
  85. def __init__(self, product: Product):
  86. self.product = product
  87. self.snapshot_service = SnapshotService()
  88. def _get_executor(self, analysis_type: AnalysisType, provider: ModelProvider):
  89. """获取执行器实例"""
  90. executor_map = {
  91. AnalysisType.COMPETITOR: CompetitorAnalysisExecutor,
  92. AnalysisType.MARKETING: MarketingAnalysisExecutor
  93. }
  94. return executor_map[analysis_type](
  95. product=self.product,
  96. model_provider=provider.value
  97. )
  98. async def execute_analysis(
  99. self,
  100. analysis_type: AnalysisType,
  101. model_provider: ModelProvider,
  102. compare_with_previous: bool = True
  103. ) -> AIAnalyzeCompare:
  104. """执行完整分析流程"""
  105. # 1. 准备并执行分析
  106. executor = self._get_executor(analysis_type, model_provider)
  107. input_data = await executor.prepare_input_data()
  108. raw_result = await executor.execute_analysis(input_data)
  109. # 2. 创建分析快照
  110. snapshot = await self.snapshot_service.create_snapshot(
  111. product_id=str(self.product.id),
  112. analysis_type=analysis_type,
  113. model_provider=model_provider,
  114. input_data=input_data,
  115. output_data=raw_result.dict()
  116. )
  117. # 3. 构建并存储对比结果
  118. compare_result = AIAnalyzeCompare(
  119. model=model_provider.value,
  120. analysis_type=analysis_type,
  121. input_data_hash=snapshot.input_hash,
  122. output_data_hash=snapshot.output_hash,
  123. analysis_result=raw_result
  124. )
  125. # 4. 更新产品记录
  126. await self._update_product_record(model_provider, analysis_type, compare_result)
  127. # 5. 结果对比(如果需要)
  128. if compare_with_previous:
  129. previous = await self._get_previous_analysis(model_provider, analysis_type)
  130. if previous:
  131. compare_result.differences = self._compare_results(raw_result, previous)
  132. return compare_result
  133. async def _update_product_record(self, provider: ModelProvider, analysis_type: AnalysisType, result: AIAnalyzeCompare):
  134. """更新产品分析记录"""
  135. update_field = f"ai_analysis_compare.{provider.value}_{analysis_type.value}"
  136. await self.product.update({update_field: result.dict()})
  137. async def _get_previous_analysis(self, provider: ModelProvider, analysis_type: AnalysisType):
  138. """获取最近一次同类型分析结果"""
  139. field_name = f"{provider.value}_{analysis_type.value}"
  140. return self.product.ai_analysis_compare.get(field_name)
  141. def _compare_results(self, current: BaseModel, previous: BaseModel) -> Dict:
  142. """对比两次分析结果差异"""
  143. current_dict = current.dict()
  144. previous_dict = previous.dict()
  145. return DeepDiff(previous_dict, current_dict, ignore_order=True).to_dict()