|
|
@@ -1,4 +1,5 @@
|
|
|
import asyncio
|
|
|
+from datetime import datetime
|
|
|
import json
|
|
|
from pathlib import Path
|
|
|
from config.settings import CFG
|
|
|
@@ -8,7 +9,7 @@ from utils.file import save_to_file, read_file
|
|
|
from config.celery import app
|
|
|
# Remove direct task imports
|
|
|
from celery.result import AsyncResult
|
|
|
-from src.models.product_model import Product,CompetitorAnalysis
|
|
|
+from src.models.product_model import Product,CompetitorCrawlData,AICompetitorAnalyzeMainKeywords, SearchAmazoneKeyResult, ProductBaseInfo, Variant
|
|
|
from utils.logu import get_logger
|
|
|
from upath import UPath
|
|
|
logger = get_logger('main')
|
|
|
@@ -85,13 +86,44 @@ class ManagerTask:
|
|
|
else:
|
|
|
self.db.add_or_ignore_asin_seed(AsinSeed(asin=asin, asin_area=asin_area, mhtml_path=task_result['path']))
|
|
|
return asin_seed
|
|
|
- def submit_suggestion_task_and_wait(self, asin: str, asin_area: str = 'JP', timeout: int = 300):
|
|
|
- """提交页面解析任务并等待完成,保存结果到数据库"""
|
|
|
- # 从数据库获取mhtml路径
|
|
|
- asin_seed = self.db.get_asin_seed(asin)
|
|
|
- if asin_seed and asin_seed.extra_result_path:
|
|
|
- logger.info(f"{asin}已经解析过,跳过")
|
|
|
-
|
|
|
+ def submit_suggestion_task_and_wait(self, product: Product, search_key: str, timeout: int = 300):
|
|
|
+ """提交关键词建议任务并等待完成,保存结果到数据库"""
|
|
|
+ # 检查是否已有缓存结果
|
|
|
+ existing = next((item for item in product.competitor_analyze if item.search_key == search_key), None)
|
|
|
+ if existing and existing.crawl_result:
|
|
|
+ logger.info(f"搜索建议已存在:{search_key},跳过")
|
|
|
+ return existing
|
|
|
+
|
|
|
+ logger.info(f"开始提交搜索建议任务:{search_key}")
|
|
|
+
|
|
|
+ # 提交Celery任务
|
|
|
+ task = app.send_task('tasks.crawl_asin_exract_task.process_search_suggestion', args=[search_key])
|
|
|
+
|
|
|
+ # 等待任务完成
|
|
|
+ result = AsyncResult(task.id)
|
|
|
+ result.get(timeout=timeout)
|
|
|
+
|
|
|
+ # 处理任务结果
|
|
|
+ if result.successful():
|
|
|
+ task_result = result.result
|
|
|
+ if task_result['status'] == 'success':
|
|
|
+ # 创建新的分析记录
|
|
|
+ new_analyze = AICompetitorAnalyzeMainKeywords(
|
|
|
+ search_key=search_key,
|
|
|
+ crawl_result=SearchAmazoneKeyResult(**task_result['data']),
|
|
|
+ created_at=datetime.now()
|
|
|
+ )
|
|
|
+
|
|
|
+ # 添加到产品分析列表
|
|
|
+ product.competitor_analyze.append(new_analyze)
|
|
|
+ self.db_mongo.save(product)
|
|
|
+
|
|
|
+ logger.info(f"搜索建议处理成功:{search_key}")
|
|
|
+ return new_analyze
|
|
|
+
|
|
|
+ logger.error(f"搜索建议处理失败:{search_key}")
|
|
|
+ return None
|
|
|
+
|
|
|
async def extract_competitor_analysis(self, product: Product, timeout: int = 300):
|
|
|
"""异步提交竞争对手分析任务并等待结果
|
|
|
Args:
|
|
|
@@ -103,8 +135,8 @@ class ManagerTask:
|
|
|
tasks = []
|
|
|
|
|
|
# 收集需要处理的任务
|
|
|
- for asin, competitor in product.competitor_analysis.items():
|
|
|
- competitor: CompetitorAnalysis
|
|
|
+ for asin, competitor in product.competitor_crawl_data.items():
|
|
|
+ competitor: CompetitorCrawlData
|
|
|
if competitor.extra_result:
|
|
|
logger.info(f"{asin}已缓存结果,跳过处理")
|
|
|
continue
|
|
|
@@ -145,7 +177,7 @@ async def test_product_mongo():
|
|
|
asin_completed = manager.db.get_asin_completed()
|
|
|
logger.info(f"{asin_completed}")
|
|
|
for asin_model in asin_completed:
|
|
|
- if not product.competitor_analysis or asin_model.asin not in product.competitor_analysis:
|
|
|
+ if not product.competitor_crawl_data or asin_model.asin not in product.competitor_crawl_data:
|
|
|
logger.info(f"{asin_model.mhtml_path}")
|
|
|
mthml_data = read_file(asin_model.mhtml_path)
|
|
|
mhtml_path_name = Path(asin_model.mhtml_path).name
|
|
|
@@ -153,14 +185,14 @@ async def test_product_mongo():
|
|
|
res = save_to_file(mthml_data, new_path)
|
|
|
logger.info(f"new path {res}")
|
|
|
continue
|
|
|
- compet = CompetitorAnalysis(
|
|
|
+ compet = CompetitorCrawlData(
|
|
|
sql_id=asin_model.id,
|
|
|
asin=asin_model.asin,
|
|
|
asin_area=asin_model.asin_area,
|
|
|
mhtml_path=asin_model.mhtml_path,
|
|
|
extra_result_path=asin_model.extra_result_path,
|
|
|
created_at=asin_model.created_at,)
|
|
|
- product.competitor_analysis[asin_model.asin] = compet
|
|
|
+ product.competitor_crawl_data[asin_model.asin] = compet
|
|
|
await product.save()
|
|
|
return product
|
|
|
async def main():
|
|
|
@@ -170,7 +202,7 @@ async def main():
|
|
|
product = await Product.find_one(Product.basic_info.name == "电线保护套")
|
|
|
await manager.extract_competitor_analysis(product)
|
|
|
return
|
|
|
- for asin in product.competitor_analysis.keys():
|
|
|
+ for asin in product.competitor_crawl_data.keys():
|
|
|
logger.info(f"{asin}")
|
|
|
manager.submit_asinseed_task_and_wait(asin)
|
|
|
manager.submit_extract_task_and_wait(asin)
|