celery.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. from celery import Celery
  2. from config.settings import CFG
  3. from config import celery_config
  4. from celery import signals
  5. import os
  6. from utils.logu import logger
  7. from utils.drission_page import ChromeOptions
  8. from utils.drission_page import load_chrome_from_ini,ChromeOptions
  9. @signals.worker_init.connect
  10. def on_worker_init(sender=None, **kwargs):
  11. """Worker启动时初始化检查"""
  12. # 打印关键配置(过滤敏感字段)
  13. cfg_info = CFG.model_dump(exclude={'s3_secret_key'})
  14. # s3_secret_key not show in log
  15. logger.info(f"Worker启动配置检查:\n{cfg_info}")
  16. # 状态检查示例:确保输出目录存在
  17. output_dir = f"{CFG.s3_prefix}/output"
  18. if not os.path.exists(output_dir):
  19. os.makedirs(output_dir, exist_ok=True)
  20. logger.warning(f"创建缺失的output目录: {output_dir}")
  21. logger.info(f"Worker初始化完成,当前配置版本: {CFG.version}")
  22. logger.info(f"浏览器配置: {CFG.chrome_config_ini}")
  23. @signals.worker_shutdown.connect
  24. def on_worker_shutdown(sender=None, **kwargs):
  25. """Worker关闭时执行清理操作"""
  26. logger.info("Worker正在关闭,执行清理操作...")
  27. # 示例:关闭数据库连接、释放资源等
  28. logger.info("Worker关闭完成")
  29. app = Celery(
  30. 'copywriting_production',
  31. backend=CFG.celery_reulst_url,
  32. broker=CFG.redis_url,
  33. include=[
  34. 'src.tasks.crawl_asin_save_task',
  35. 'src.tasks.crawl_asin_exract_task',
  36. ]
  37. )
  38. app.config_from_object(celery_config)