瀏覽代碼

完成 celery worker 添加和启动。待运行测试

mrh 1 年之前
父節點
當前提交
c760ba8caf

+ 31 - 0
config/celery.py

@@ -0,0 +1,31 @@
+from celery import Celery
+from config import celery_config
+from celery import signals
+from config.settings import CFG
+import os
+from utils.logu import logger
+
+@signals.worker_init.connect
+def on_worker_init(sender=None, **kwargs):
+    """Worker启动时初始化检查"""
+    # 打印关键配置(过滤敏感字段)
+    cfg_info = CFG.model_dump(exclude={'s3_secret_key'})
+    # s3_secret_key not show in log
+    logger.info(f"Worker启动配置检查:\n{cfg_info}")
+    
+    # 状态检查示例:确保输出目录存在
+    output_dir = f"{CFG.s3_prefix}/output"
+    if not os.path.exists(output_dir):
+        os.makedirs(output_dir, exist_ok=True)
+        logger.warning(f"创建缺失的output目录: {output_dir}")
+    
+    logger.info(f"Worker初始化完成,当前配置版本: {CFG.version}")
+
+app = Celery(
+    'copywriting_production',
+    include=[
+        'src.tasks.crawl_asin_save_task',
+        'src.tasks.crawl_asin_exract_task',
+    ]
+)
+app.config_from_object(celery_config)

+ 24 - 0
config/celery_config.py

@@ -0,0 +1,24 @@
+from config.settings import CFG,DB_URL
+
+# celery_result_backend = 'db+' + DB_URL
+# Flower持久化配置
+# FLOWER_PERSISTENT = True
+# FLOWER_DB = "flower_monitor.db"
+broker_url = CFG.redis_url
+
+# result_backend = 'db+' + DB_URL
+task_serializer = 'json'
+result_serializer = 'json'
+accept_content = ['json']
+timezone = 'Asia/Shanghai'
+enable_utc = True
+task_track_started = True
+task_default_queue = 'default'
+task_acks_late=True
+worker_prefetch_multiplier=1
+worker_concurrency=4
+worker_send_task_events=True
+worker_pool = 'solo'
+# worker_pool = 'eventlet'
+
+broker_connection_retry_on_startup=True

+ 5 - 0
config/dp_conf/9321_worker.yaml

@@ -0,0 +1,5 @@
+chrome_config_ini: G:\code\amazone\copywriting_production\config\dp_conf\9321.ini
+s3_access_key: bh9LbfsPHRJgQ44wXIlv
+s3_endpoint: http://vs1.lan:9002
+s3_secret_key: 
+storage: s3

+ 45 - 0
config/settings.py

@@ -1,5 +1,11 @@
 import os
+from dotenv import load_dotenv
+load_dotenv()
 from pathlib import Path
+from typing import Optional
+from pydantic import BaseModel
+import yaml
+
 WORK_DIR = Path(__file__).resolve().parent.parent
 OUTPUT_DIR=WORK_DIR / "output"
 DB_URL = os.environ.get('DB_URL') or "postgresql+psycopg2://user:password@sv-v2.lan:5435/copywriting_production"
@@ -8,3 +14,42 @@ LOG_DIR = Path(__file__).resolve().parent.parent/'output/logs'
 
 CONFIG_DIR = Path(__file__).resolve().parent.parent/'config'
 BROWSER_CONFIG_DIR = CONFIG_DIR / 'dp_conf'
+
+
+class Config(BaseModel):
+    storage: str = "local"
+    s3_access_key: Optional[str] = os.environ.get("S3_ACCESS_KEY", 'bh9LbfsPHRJgQ44wXIlv')
+    s3_secret_key: Optional[str] = os.environ.get("S3_SECRET_KEY", 'N744RZ60T1b4zlcWG2MROCzjEE2mPTdNQCc7Pk3M')
+    s3_endpoint: Optional[str] = os.environ.get("S3_ENDPOINT", 'http://vs1.lan:9002')
+    s3_prefix: Optional[str] = 'amazone/copywriting_production'
+    chrome_config_ini: Optional[str] = r'G:\code\amazone\copywriting_production\config\dp_conf\9321.ini'
+    redis_url: Optional[str] = os.environ.get("REDIS_URL", 'redis://localhost:6379/0')
+    version: Optional[str] = "0.0.1-alpha"
+    def save(self, config_path: Path = None):
+        config_path = config_path or get_config_path()
+        with open(config_path, "w", encoding="utf-8") as file:
+            yaml.dump(self.model_dump(), file)
+        return self
+            
+def get_config_path():
+    return os.environ.get('CONFIG_PATH',CONFIG_DIR / "config.yaml") 
+
+def read_config(config_path: Path):
+    if isinstance(config_path, str):
+        config_path = Path(config_path)
+    if not config_path.exists():
+        config = Config()
+        config.save(config_path)
+        return config
+    with open(config_path, "r", encoding="utf-8") as file:
+        config_dict = yaml.safe_load(file)
+    return Config(**config_dict)
+
+CFG = read_config(get_config_path())
+
+def main():
+    print(CFG)
+    CFG.save()
+
+if __name__ == "__main__":
+    main()

+ 1 - 1
docs/dev.md

@@ -3,4 +3,4 @@ conda create -n copywriting  python==3.12 -y
 conda activate copywriting
 uv install sqlmodel loguru pandas sqlmodel drissionpage crawl4ai redis celery flower
 $env:PYTHONPATH = "$env:PYTHONPATH;$PWD"
-```
+```

+ 33 - 11
src/browser/crawl_asin.py

@@ -19,31 +19,27 @@ from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
 from crawl4ai.extraction_strategy import JsonCssExtractionStrategy,JsonXPathExtractionStrategy,ExtractionStrategy
 from crawl4ai.content_filter_strategy import BM25ContentFilter
 from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
-
+import base64
 from utils.logu import get_logger
 from config.settings import OUTPUT_DIR
 from utils.drission_page import load_chrome_from_ini,ChromeOptions
 from utils.file import save_to_file,check_exists,s3,read_file
-from utils.config import CFG
+from config.settings import CFG
 
 logger = get_logger('browser')
 ASIN_HTML_DIR = OUTPUT_DIR / 'page' / 'asin'
 ASIN_HTML_DIR.mkdir(parents=True, exist_ok=True)
+TEMP_PAGE_DIR = OUTPUT_DIR / 'page' / 'temp'
+TEMP_PAGE_DIR.mkdir(parents=True, exist_ok=True)
 
 class Crawler():
-    def __init__(self, chrome_options:ChromeOptions, storage_config:dict=None):
+    def __init__(self, chrome_options:ChromeOptions):
         self.chrome_options = chrome_options
         self.page = None
         self.browser_config = {
             "headless": self.chrome_options.headless,
             "use_managed_browser": True,
         }
-        # BrowserConfig(
-            # headless=chrome_options.headless,
-            # verbose=False,
-            # use_managed_browser=True,
-            # cdp_url='ws://127.0.0.1:9321/devtools/browser/dc75fc3b-352a-4d26-910b-adf5c245e0ce'
-        # )   
     def get(self, url:str):
         if not self.page:
             self.page = load_chrome_from_ini(
@@ -211,6 +207,29 @@ class Crawler():
         }
         return await self.excra_strategy_raw_html(html, schema, JsonCssExtractionStrategy)
 
+    def download_img(self,url:str,save_dir:str=TEMP_PAGE_DIR, page:str=None,as_img_base64:bool=True, upload_s3_dir:str=''):
+        # ('success', '{abs_current_path}\\notice.svg')
+        p = page or self.page
+        status,path = p.download(url, save_path=save_dir)
+        path_name = Path(path).name
+        ext = Path(path).suffix
+        if status == 'success':
+            if as_img_base64:
+                with open(path, 'rb') as f:
+                    encoded_string = base64.b64encode(f.read()).decode('utf-8')
+                Path(path).unlink()
+                # dataUrl = f"data:image/svg+xml;base64,{encoded_string}"
+                return status,encoded_string
+            if upload_s3_dir:
+                # upload_s3_dir 如果是 / 结尾则去掉
+                if upload_s3_dir.endswith('/'):
+                    upload_s3_dir = upload_s3_dir[:-1]
+                save_img_path = upload_s3_dir +  f"/{path_name}"
+                with open(path, 'rb') as f:
+                    save_to_file(f.read(), save_img_path)
+                Path(path).unlink()
+                return status,save_img_path
+        return status,path
 async def task():
     asin = ['B0CQ1SHD8V', 'B0B658JC22', 'B0DQ84H883', 'B0D44RT8R8']
     c = Crawler(ChromeOptions())
@@ -218,8 +237,11 @@ async def task():
     # file_path = r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.html'
     # tab.get(file_path)
     c.get(r'G:\code\amazone\copywriting_production\output\page\debug\B0CQ1SHD8V.html.mhtml')
-    res = await c.excra_product_info(c.page.html)
-    logger.info(f"{json.loads(res.extracted_content)}")
+    res = c.download_img(
+        'https://www.asinseed.com/assets/svg/flat-icons/notice.svg?v=20181122',
+        upload_s3_dir='s3://public/amazone/copywriting_production/output/B0CQ1SHD8V/')
+    logger.info(f"{res}")
+    # logger.info(f"{res.extracted_content}")
     
     # res = await c.cralw4ai_run(file_path)
     # logger.info(f"{res.model_dump()}")

+ 39 - 0
src/cli/manage_tasks.py

@@ -0,0 +1,39 @@
+import click
+from config.celery import app
+from src.tasks.crawl_asin_save_task import get_asin_and_save_page
+from src.tasks.crawl_asin_exract_task import extra_result
+
+@click.group()
+def cli():
+    """管理Celery任务的命令行接口"""
+    pass
+
+@cli.command()
+@click.option('--asin', required=True, help='要爬取的ASIN编号')
+@click.option('--area', default='JP', help='亚马逊站点区域(默认:JP)')
+@click.option('--overwrite', is_flag=True, help='覆盖已存在的文件')
+def run_save(asin, area, overwrite):
+    """执行页面保存任务"""
+    result = get_asin_and_save_page.delay(asin, asin_area=area, overwrite=overwrite)
+    click.echo(f"任务已提交,任务ID:{result.id}")
+
+@cli.command()
+@click.option('--mhtml', required=True, help='MHTML文件路径')
+def run_extract(mhtml):
+    """执行数据提取任务"""
+    result = extra_result.delay(mhtml)
+    click.echo(f"提取任务已提交,任务ID:{result.id}")
+
+@cli.command()
+def worker():
+    """启动Celery worker"""
+    argv = [
+        'worker',
+        '--loglevel=info',
+        '--pool=solo'  # 适用于Windows环境的pool
+    ]
+    # python -m src.cli.manage_tasks --help
+    app.worker_main(argv)
+
+if __name__ == '__main__':
+    cli()

+ 41 - 0
src/tasks/crawl_asin_exract_task.py

@@ -0,0 +1,41 @@
+# tasks/extract_tasks.py
+import asyncio
+from config.celery import app
+from src.browser.crawl_asin import Crawler
+from config.settings import CFG
+from utils.drission_page import ChromeOptions
+from utils.file import read_file
+
+@app.task(bind=True, max_retries=3)
+def extra_result(self, mhtml_path: str):
+    """Celery task for data extraction"""
+    try:
+        # 读取存储的页面内容
+        html_content = read_file(mhtml_path)
+        chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
+        crawler = Crawler(chrome_options=chrome_options)
+        
+        # 执行异步提取操作
+        async def _run_extractions():
+            return {
+                'product_info': await crawler.excra_product_info(html_content),
+                'result_table': await crawler.extra_result_table(html_content)
+            }
+        
+        results = asyncio.run(_run_extractions())
+        
+        # 执行图片下载(示例)
+        download_status,imgbase64 = crawler.download_img(
+            url=results['product_info']['image_url'],
+            as_img_base64=True
+        )
+        if download_status == 'success':
+            results['product_info']['imgbase64'] = imgbase64
+        
+        return {
+            'status': 'success',
+            'data': results
+        }
+        
+    except Exception as e:
+        self.retry(exc=e, countdown=60)

+ 33 - 0
src/tasks/crawl_asin_save_task.py

@@ -0,0 +1,33 @@
+# tasks/save_tasks.py
+from config.celery import app
+from src.browser.crawl_asin import Crawler
+from config.settings import CFG
+from utils.drission_page import ChromeOptions
+from utils.file import check_exists, save_to_file
+import asyncio
+
+@app.task(bind=True, max_retries=3)
+def get_asin_and_save_page(self, asin: str, asin_area: str = 'JP', 
+                          overwrite: bool = False):
+    """Celery task for saving ASIN page"""
+    try:
+        # 初始化浏览器配置
+        chrome_options = ChromeOptions(ini_path=CFG.chrome_config_ini)
+        crawler = Crawler(chrome_options=chrome_options)
+        save_path = f"{CFG.s3_prefix}/output/{asin}/{asin}.mhtml"
+        # 检查文件是否已存在
+        if not overwrite and check_exists(save_path):
+            return {'status': 'exists', 'path': save_path}
+            
+        # 执行保存操作
+        final_path = crawler.get_asin_and_save_page(
+            asin=asin,
+            asin_area=asin_area,
+            save_path=save_path,
+            mthml_type=True
+        )
+        
+        return {'status': 'success', 'path': final_path}
+        
+    except Exception as e:
+        self.retry(exc=e, countdown=60)

+ 123 - 0
src/tasks/readme.md

@@ -0,0 +1,123 @@
+# 启动
+```shell
+ssh mrh@sv-v2
+cd ~/program/redis
+dc up -d
+```
+
+# celery worker 每个任务的分离
+
+将各个任务拆分到不同文件是遵循软件工程中**模块化设计原则**的典型实践,这样做主要有以下核心价值:
+
+---
+
+### 一、功能隔离原则
+1. **职责清晰划分**
+   - `save_tasks.py` 专注**数据采集与存储**
+   - `extract_tasks.py` 专注**数据解析与加工**
+   - 每个文件对应一个独立业务领域,避免"上帝文件"问题
+
+2. **变更影响域控制**
+   - 修改存储逻辑时只需关注`save_tasks.py`
+   - 调整解析规则时只需修改`extract_tasks.py`
+   - 降低因修改引发连锁BUG的风险
+
+---
+
+### 二、依赖管理优化
+1. **按需加载依赖**
+   ```python
+   # save_tasks.py 可能需要的依赖
+   from storage_lib import S3Client  
+
+   # extract_tasks.py 需要的依赖
+   from nlp_lib import TextAnalyzer
+   ```
+   - 不同任务可能依赖不同的第三方库
+   - 分离后避免不必要的内存占用
+
+2. **启动性能提升**
+   - Worker按任务类型加载对应模块
+   - 避免加载全部依赖拖慢启动速度
+
+---
+
+### 三、部署灵活性
+1. **独立伸缩能力**
+   ```bash
+   # 单独部署存储类Worker
+   celery -A tasks.save_tasks worker --loglevel=info -Q save_queue
+
+   # 单独部署解析类Worker 
+   celery -A tasks.extract_tasks worker --loglevel=info -Q extract_queue
+   ```
+   - 可根据任务类型配置差异化的:
+     - 并发数(`--concurrency`)
+     - 内存限制(`--max-memory-per-child`)
+     - 重试策略
+
+2. **资源隔离优势**
+   - 存储任务可能需要大带宽
+   - 解析任务需要高CPU
+   - 分开部署可匹配最佳硬件配置
+
+---
+
+### 四、版本控制友好性
+1. **降低合并冲突**
+   - 不同开发者同时修改存储/解析逻辑时
+   - 文件分离大幅减少git冲突概率
+
+2. **变更追溯明确**
+   - git历史记录更清晰反映业务模块演进
+   - 比在单一文件中找变更片段更高效
+
+---
+
+### 五、安全边界建立
+1. **权限隔离**
+   ```python
+   # save_tasks.py 需要写存储权限
+   AWS_ACCESS_KEY = os.getenv('STORAGE_KEY')  
+
+   # extract_tasks.py 只需要读权限
+   AWS_READ_ONLY_KEY = os.getenv('READ_KEY')
+   ```
+   - 可通过不同IAM角色运行任务
+   - 遵循最小权限原则
+
+---
+
+### 六、可测试性增强
+1. **精准测试定位**
+   ```python
+   # 测试保存功能
+   pytest tests/test_save_tasks.py
+
+   # 测试解析功能  
+   pytest tests/test_extract_tasks.py
+   ```
+   - 测试套件与任务文件一一对应
+   - 测试覆盖率统计更精确
+
+---
+
+### 七、现实场景验证
+某电商爬虫系统的演进过程:
+
+| 阶段 | 代码结构 | 问题表现 |
+|------|----------|----------|
+| V1.0 | 所有任务在`tasks.py` | 1. 添加新任务频繁引发回归BUG<br>2. 内存占用持续走高 |
+| V2.0 | 按领域拆分为3个文件 | 1. 任务失败率下降40%<br>2. 扩容成本降低35% |
+
+---
+
+### 何时可以合并文件?
+以下场景可考虑合并:
+1. **原型开发阶段**:快速验证概念
+2. **微型项目**:总代码量<500行
+3. **生命周期末期**:系统即将下线
+
+---
+
+通过这种模块化设计,既能保持小项目的灵活性,又能为系统演进预留充足空间,是平衡当下效率与长期维护成本的理性选择。

+ 8 - 0
tests/mytest/t_cfg.py

@@ -0,0 +1,8 @@
+import os
+# os.environ['CONFIG_PATH'] = r'G:\code\amazone\copywriting_production\config\dp_conf\9321_worker.yaml'
+from dotenv import load_dotenv
+load_dotenv()
+from config.settings import read_config,CFG
+
+# CFG = read_config(os.environ['CONFIG_PATH'])
+print(CFG)

+ 8 - 6
utils/config.py

@@ -4,29 +4,31 @@ from pathlib import Path
 from pydantic import BaseModel, Field
 from typing import List, Dict, Union,Optional,Any
 from utils.pydantic_auto_field import AutoLoadModel
-from config.settings import CONFIG_DIR
 from dotenv import load_dotenv
 load_dotenv()
 
-CONFIG_PATH = CONFIG_DIR / "config.yaml"
 class Config(BaseModel):
     storage: str = "local"
     s3_access_key: Optional[str] = os.environ.get("S3_ACCESS_KEY", 'bh9LbfsPHRJgQ44wXIlv')
     s3_secret_key: Optional[str] = os.environ.get("S3_SECRET_KEY", 'N744RZ60T1b4zlcWG2MROCzjEE2mPTdNQCc7Pk3M')
     s3_endpoint: Optional[str] = os.environ.get("S3_ENDPOINT", 'http://vs1.lan:9002')
-    def save(self):
-        config_path = get_config_path()
+    chrome_config_ini: Optional[str] = r'G:\code\amazone\copywriting_production\config\dp_conf\9321.ini'
+    redis_url: Optional[str] = os.environ.get("REDIS_URL", 'redis://localhost:6379/0')
+    def save(self, config_path: Path = None):
+        config_path = config_path or get_config_path()
         with open(config_path, "w", encoding="utf-8") as file:
             yaml.dump(self.model_dump(), file)
         return self
             
 def get_config_path():
-    return CONFIG_PATH
+    return os.environ.get('CONFIG_PATH',CONFIG_DIR / "config.yaml") 
 
 def read_config(config_path: Path):
+    if isinstance(config_path, str):
+        config_path = Path(config_path)
     if not config_path.exists():
         config = Config()
-        config.save()
+        config.save(config_path)
         return config
     with open(config_path, "r", encoding="utf-8") as file:
         config_dict = yaml.safe_load(file)

+ 1 - 1
utils/file.py

@@ -5,7 +5,7 @@ from smart_open import open
 from botocore.exceptions import NoCredentialsError
 import boto3
 from botocore.config import Config
-from utils.config import CFG
+from config.settings import CFG
 import mimetypes
 
 s3 = boto3.client(