| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- # tasks/save_tasks.py
- from config.celery import app # Keep celery app
- from src.browser.crawl_asin import Crawler
- from src.browser.crawl_amz_search_key import CrawlerAmzSearchKey,CrawlerSearchKeyInput
- from config.settings import CFG
- from utils.drission_page import ChromeOptions
- from utils.file import check_exists, save_to_file,s3_uri_to_http_url
- from utils.logu import get_logger
- logger = get_logger('worker')
- import asyncio
- @app.task(bind=True,
- name='tasks.crawl_asin_save_task.get_asin_and_save_page',
- max_retries=3,
- rate_limit='10/m')
- def get_asin_and_save_page(self, asin: str, asin_area: str = 'JP',
- overwrite: bool = False):
- """Celery task for saving ASIN page"""
- try:
- logger.info(f"任务开始: asin {asin} , asin_area {asin_area} overwrite {overwrite} ")
- # 初始化浏览器配置
- chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
- crawler = Crawler(chrome_options=chrome_options)
- save_path = Crawler.s3_prefix + f"{asin}/{asin}.mhtml" # 保持.mhtml后缀与内容类型一致
- # 执行保存操作
- final_path = crawler.get_asin_and_save_page(
- asin=asin,
- asin_area=asin_area,
- save_path=save_path,
- mthml_type=True,
- overwrite=overwrite,
- )
- logger.info(f"任务成功: {final_path}")
- return {'status': 'success', 'path': final_path}
-
- except Exception as e:
- logger.exception(f"任务失败:{e}")
- self.retry(exc=e, countdown=60)
- @app.task(bind=True)
- def get_amz_search_key_suggestion(self, input_data: CrawlerSearchKeyInput):
- try:
- logger.info(f"任务开始: input_data {input_data}")
- loop = asyncio.get_event_loop()
- # 初始化浏览器配置
- chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
- crawler = CrawlerAmzSearchKey(chrome_options=chrome_options)
- res = loop.run_until_complete(crawler.crawl_suggestion(input_data))
- logger.info(f"异步任务完成: {res}")
- return res
-
- except Exception as e:
- logger.exception(f"任务失败:{e}")
- self.retry(exc=e, countdown=60)
|