celery.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from celery import Celery
  2. from config.settings import CFG
  3. from config import celery_config
  4. from celery import signals
  5. import os
  6. from utils.logu import logger
  7. @signals.worker_init.connect
  8. def on_worker_init(sender=None, **kwargs):
  9. """Worker启动时初始化检查"""
  10. # 打印关键配置(过滤敏感字段)
  11. cfg_info = CFG.model_dump(exclude={'s3_secret_key'})
  12. # s3_secret_key not show in log
  13. logger.info(f"Worker启动配置检查:\n{cfg_info}")
  14. # 状态检查示例:确保输出目录存在
  15. output_dir = f"{CFG.s3_prefix}/output"
  16. if not os.path.exists(output_dir):
  17. os.makedirs(output_dir, exist_ok=True)
  18. logger.warning(f"创建缺失的output目录: {output_dir}")
  19. logger.info(f"Worker初始化完成,当前配置版本: {CFG.version}")
  20. @signals.worker_shutdown.connect
  21. def on_worker_shutdown(sender=None, **kwargs):
  22. """Worker关闭时执行清理操作"""
  23. logger.info("Worker正在关闭,执行清理操作...")
  24. # 示例:关闭数据库连接、释放资源等
  25. # close_db_connections()
  26. logger.info("Worker关闭完成")
  27. app = Celery(
  28. 'copywriting_production',
  29. backend=CFG.redis_url,
  30. include=[
  31. 'src.tasks.crawl_asin_save_task',
  32. 'src.tasks.crawl_asin_exract_task',
  33. ]
  34. )
  35. app.config_from_object(celery_config)