crawl_asin_save_task.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. # tasks/save_tasks.py
  2. from config.celery import app # Keep celery app
  3. from src.browser.crawl_asin import Crawler
  4. from src.browser.crawl_amz_search_key import CrawlerAmzSearchKey,CrawlerSearchKeyInput
  5. from config.settings import CFG
  6. from utils.drission_page import ChromeOptions
  7. from utils.file import check_exists, save_to_file,s3_uri_to_http_url
  8. from utils.logu import get_logger
  9. logger = get_logger('worker')
  10. import asyncio
  11. @app.task(bind=True,
  12. name='tasks.crawl_asin_save_task.get_asin_and_save_page',
  13. max_retries=3,
  14. rate_limit='10/m')
  15. def get_asin_and_save_page(self, asin: str, asin_area: str = 'JP',
  16. overwrite: bool = False):
  17. """Celery task for saving ASIN page"""
  18. try:
  19. logger.info(f"任务开始: asin {asin} , asin_area {asin_area} overwrite {overwrite} ")
  20. # 初始化浏览器配置
  21. chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
  22. crawler = Crawler(chrome_options=chrome_options)
  23. save_path = Crawler.s3_prefix + f"{asin}/{asin}.mhtml" # 保持.mhtml后缀与内容类型一致
  24. # 执行保存操作
  25. final_path = crawler.get_asin_and_save_page(
  26. asin=asin,
  27. asin_area=asin_area,
  28. save_path=save_path,
  29. mthml_type=True,
  30. overwrite=overwrite,
  31. )
  32. logger.info(f"任务成功: {final_path}")
  33. return {'status': 'success', 'path': final_path}
  34. except Exception as e:
  35. logger.exception(f"任务失败:{e}")
  36. self.retry(exc=e, countdown=60)
  37. @app.task(bind=True)
  38. def get_amz_search_key_suggestion(self, input_data: CrawlerSearchKeyInput):
  39. try:
  40. logger.info(f"任务开始: input_data {input_data}")
  41. loop = asyncio.get_event_loop()
  42. # 初始化浏览器配置
  43. chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
  44. crawler = CrawlerAmzSearchKey(chrome_options=chrome_options)
  45. res = loop.run_until_complete(crawler.crawl_suggestion(input_data))
  46. logger.info(f"异步任务完成: {res}")
  47. return res
  48. except Exception as e:
  49. logger.exception(f"任务失败:{e}")
  50. self.retry(exc=e, countdown=60)