# tasks/save_tasks.py from config.celery import app # Keep celery app from src.browser.crawl_asin import Crawler from src.browser.crawl_amz_search_key import CrawlerAmzSearchKey,CrawlerSearchKeyInput from config.settings import CFG from utils.drission_page import ChromeOptions from utils.file import check_exists, save_to_file,s3_uri_to_http_url from utils.logu import get_logger logger = get_logger('worker') import asyncio @app.task(bind=True, name='tasks.crawl_asin_save_task.get_asin_and_save_page', max_retries=3, rate_limit='10/m') def get_asin_and_save_page(self, asin: str, asin_area: str = 'JP', overwrite: bool = False): """Celery task for saving ASIN page""" try: logger.info(f"任务开始: asin {asin} , asin_area {asin_area} overwrite {overwrite} ") # 初始化浏览器配置 chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini) crawler = Crawler(chrome_options=chrome_options) save_path = Crawler.s3_prefix + f"{asin}/{asin}.mhtml" # 保持.mhtml后缀与内容类型一致 # 执行保存操作 final_path = crawler.get_asin_and_save_page( asin=asin, asin_area=asin_area, save_path=save_path, mthml_type=True, overwrite=overwrite, ) logger.info(f"任务成功: {final_path}") return {'status': 'success', 'path': final_path} except Exception as e: logger.exception(f"任务失败:{e}") self.retry(exc=e, countdown=60) @app.task(bind=True) def get_amz_search_key_suggestion(self, input_data: CrawlerSearchKeyInput): try: logger.info(f"任务开始: input_data {input_data}") loop = asyncio.get_event_loop() # 初始化浏览器配置 chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini) crawler = CrawlerAmzSearchKey(chrome_options=chrome_options) res = loop.run_until_complete(crawler.crawl_suggestion(input_data)) logger.info(f"异步任务完成: {res}") return res except Exception as e: logger.exception(f"任务失败:{e}") self.retry(exc=e, countdown=60)