from celery import Celery from config.settings import CFG from config import celery_config from celery import signals import os from utils.logu import logger @signals.worker_init.connect def on_worker_init(sender=None, **kwargs): """Worker启动时初始化检查""" # 打印关键配置(过滤敏感字段) cfg_info = CFG.model_dump(exclude={'s3_secret_key'}) # s3_secret_key not show in log logger.info(f"Worker启动配置检查:\n{cfg_info}") # 状态检查示例:确保输出目录存在 output_dir = f"{CFG.s3_prefix}/output" if not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True) logger.warning(f"创建缺失的output目录: {output_dir}") logger.info(f"Worker初始化完成,当前配置版本: {CFG.version}") app = Celery( 'copywriting_production', backend=CFG.redis_url, include=[ 'src.tasks.crawl_asin_save_task', 'src.tasks.crawl_asin_exract_task', ] ) app.config_from_object(celery_config)