| 123456789101112131415161718192021222324252627282930313233343536373839404142 |
- from celery import Celery
- from config.settings import CFG
- from config import celery_config
- from celery import signals
- import os
- from utils.logu import logger
- @signals.worker_init.connect
- def on_worker_init(sender=None, **kwargs):
- """Worker启动时初始化检查"""
- # 打印关键配置(过滤敏感字段)
- cfg_info = CFG.model_dump(exclude={'s3_secret_key'})
- # s3_secret_key not show in log
- logger.info(f"Worker启动配置检查:\n{cfg_info}")
-
- # 状态检查示例:确保输出目录存在
- output_dir = f"{CFG.s3_prefix}/output"
- if not os.path.exists(output_dir):
- os.makedirs(output_dir, exist_ok=True)
- logger.warning(f"创建缺失的output目录: {output_dir}")
-
- logger.info(f"Worker初始化完成,当前配置版本: {CFG.version}")
- @signals.worker_shutdown.connect
- def on_worker_shutdown(sender=None, **kwargs):
- """Worker关闭时执行清理操作"""
- logger.info("Worker正在关闭,执行清理操作...")
-
- # 示例:关闭数据库连接、释放资源等
- # close_db_connections()
-
- logger.info("Worker关闭完成")
-
- app = Celery(
- 'copywriting_production',
- backend=CFG.redis_url,
- include=[
- 'src.tasks.crawl_asin_save_task',
- 'src.tasks.crawl_asin_exract_task',
- ]
- )
- app.config_from_object(celery_config)
|