Sfoglia il codice sorgente

完成亚马逊搜索主关键词并存入数据库。备份MongoDB

mrh 11 mesi fa
parent
commit
a240577ead
2 ha cambiato i file con 35 aggiunte e 30 eliminazioni
  1. 34 29
      src/manager/manager_task.py
  2. 1 1
      src/tasks/crawl_asin_save_task.py

+ 34 - 29
src/manager/manager_task.py

@@ -89,44 +89,48 @@ class ManagerTask:
             else:
                 self.db.add_or_ignore_asin_seed(AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=task_result['path']))
             return asin_seed
-    def submit_suggestion_task_and_wait(self, product: Product, search_key: str, timeout: int = 300):
-        """提交关键词建议任务并等待完成,保存结果到数据库"""
+    async def submit_suggestion_task_and_wait(self, ai_competitor_anlayze_model: AICompetitorAnalyzeMainKeywords, timeout: int = 300):
+        """异步提交关键词建议任务并等待完成,保存结果到数据库"""
         # 检查是否已有缓存结果
-        existing = next((item for item in product.competitor_analyze if item.search_key == search_key), None)
-        if existing and existing.crawl_result:
-            logger.info(f"搜索建议已存在:{search_key},跳过")
-            return existing
+        if ai_competitor_anlayze_model.crawl_result:
+            logger.info(f"搜索建议已存在:{ai_competitor_anlayze_model.asin} {ai_competitor_anlayze_model.main_key},跳过")
+            return ai_competitor_anlayze_model.crawl_result
         
-        logger.info(f"开始提交搜索建议任务:{search_key}")
+        search_key = ai_competitor_anlayze_model.main_key
+        logger.info(f"开始提交搜索建议任务:{ai_competitor_anlayze_model.asin} {search_key}")
         
         # 提交Celery任务
         task = app.send_task('tasks.crawl_asin_exract_task.process_search_suggestion', args=[search_key])
-        
-        # 等待任务完成
         result = AsyncResult(task.id)
-        result.get(timeout=timeout)
         
-        # 处理任务结果
-        if result.successful():
-            task_result = result.result
-            if task_result['status'] == 'success':
-                # 创建新的分析记录
-                new_analyze = AICompetitorAnalyzeMainKeywords(
-                    search_key=search_key,
-                    crawl_result=SearchAmazoneKeyResult(**task_result['data']),
-                    created_at=datetime.now()
-                )
-                
-                # 添加到产品分析列表
-                product.competitor_analyze.append(new_analyze)
-                self.db_mongo.save(product)
-                
-                logger.info(f"搜索建议处理成功:{search_key}")
-                return new_analyze
+        try:
+            # 使用异步方式等待任务完成
+            while not result.ready():
+                await asyncio.sleep(0.1)
+            
+            if result.successful():
+                task_result = result.result
+                if task_result['status'] == 'success':
+                    ai_competitor_anlayze_model.crawl_result = SearchAmazoneKeyResult(**task_result['data'])
+                    logger.info(f"搜索建议处理成功:{search_key}")
+                    return ai_competitor_anlayze_model
+        except Exception as e:
+            logger.error(f"搜索建议处理异常:{search_key} | 错误: {str(e)}")
         
         logger.error(f"搜索建议处理失败:{search_key}")
         return None
-        
+    
+    async def submit_search_mainkeyword(self, product: Product, timeout: int = 300):
+        """提交AI分析任务并等待完成,保存结果到数据库"""
+        # 检查是否已有缓存结果
+        if not product.competitor_analyze or not product.competitor_analyze.results:
+            logger.warning(f"AI分析主关键词结果不存在:{product.basic_info.name}")
+        for ai_result in product.competitor_analyze.results:
+            # self.submit_suggestion_task_and_wait(ai_result)
+            await self.submit_suggestion_task_and_wait(ai_result)
+        await product.save()
+        return product
+
     async def extract_competitor_analysis(self, product: Product, timeout: int = 300):
         """异步提交竞争对手分析任务并等待结果
         Args:
@@ -247,7 +251,8 @@ async def main():
     await manager.db_mongo.initialize()
     product = await Product.find_one(Product.basic_info.name == "电线保护套")
     # await manager.extract_competitor_analysis(product)
-    await manager.async_analyze_and_save(product)
+    # await manager.async_analyze_and_save(product)
+    await manager.submit_search_mainkeyword(product)
     return
     for asin in product.competitor_crawl_data.keys():
         logger.info(f"{asin}")

+ 1 - 1
src/tasks/crawl_asin_save_task.py

@@ -1,7 +1,7 @@
 # tasks/save_tasks.py
 from config.celery import app  # Keep celery app
 from src.browser.crawl_asin import Crawler
-from src.browser.crawl_amz_search_key import CrawlerAmzSearchKey,SearchKeyResult,CrawlerSearchKeyInput
+from src.browser.crawl_amz_search_key import CrawlerAmzSearchKey,CrawlerSearchKeyInput
 from config.settings import CFG
 from utils.drission_page import ChromeOptions
 from utils.file import check_exists, save_to_file,s3_uri_to_http_url