| 1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- from celery import Celery
- from config.settings import CFG
- from config import celery_config
- from celery import signals
- import os
- from utils.logu import logger
- from utils.drission_page import ChromeOptions
- from utils.drission_page import load_chrome_from_ini,ChromeOptions
- @signals.worker_init.connect
- def on_worker_init(sender=None, **kwargs):
- """Worker启动时初始化检查"""
- # 打印关键配置(过滤敏感字段)
- cfg_info = CFG.model_dump(exclude={'s3_secret_key'})
- # s3_secret_key not show in log
- logger.info(f"Worker启动配置检查:\n{cfg_info}")
- # 状态检查示例:确保输出目录存在
- output_dir = f"{CFG.s3_prefix}/output"
- if not os.path.exists(output_dir):
- os.makedirs(output_dir, exist_ok=True)
- logger.warning(f"创建缺失的output目录: {output_dir}")
- logger.info(f"Worker初始化完成,当前配置版本: {CFG.version}")
- logger.info(f"浏览器配置: {CFG.chrome_config_ini}")
- @signals.worker_shutdown.connect
- def on_worker_shutdown(sender=None, **kwargs):
- """Worker关闭时执行清理操作"""
- logger.info("Worker正在关闭,执行清理操作...")
-
- # 示例:关闭数据库连接、释放资源等
-
- logger.info("Worker关闭完成")
-
- app = Celery(
- 'copywriting_production',
- backend=CFG.celery_reulst_url,
- broker=CFG.redis_url,
- include=[
- 'src.tasks.crawl_asin_save_task',
- 'src.tasks.crawl_asin_exract_task',
- ]
- )
- app.config_from_object(celery_config)
|