celery.py 1.0 KB

1234567891011121314151617181920212223242526272829303132
  1. from celery import Celery
  2. from config.settings import CFG
  3. from config import celery_config
  4. from celery import signals
  5. import os
  6. from utils.logu import logger
  7. @signals.worker_init.connect
  8. def on_worker_init(sender=None, **kwargs):
  9. """Worker启动时初始化检查"""
  10. # 打印关键配置(过滤敏感字段)
  11. cfg_info = CFG.model_dump(exclude={'s3_secret_key'})
  12. # s3_secret_key not show in log
  13. logger.info(f"Worker启动配置检查:\n{cfg_info}")
  14. # 状态检查示例:确保输出目录存在
  15. output_dir = f"{CFG.s3_prefix}/output"
  16. if not os.path.exists(output_dir):
  17. os.makedirs(output_dir, exist_ok=True)
  18. logger.warning(f"创建缺失的output目录: {output_dir}")
  19. logger.info(f"Worker初始化完成,当前配置版本: {CFG.version}")
  20. app = Celery(
  21. 'copywriting_production',
  22. backend=CFG.redis_url,
  23. include=[
  24. 'src.tasks.crawl_asin_save_task',
  25. 'src.tasks.crawl_asin_exract_task',
  26. ]
  27. )
  28. app.config_from_object(celery_config)