Browse Source

备份。 celery 无法很好支持 playwright 上下文

mrh 10 tháng trước cách đây
mục cha
commit
ad5526ea13

+ 0 - 391
architecture.md

@@ -1,391 +0,0 @@
-# 轮询架构
-在你的场景中,Worker 是多机分布部署的,任务执行时间较长,且可能因错误被终止。为了确保任务的可追踪性和实时性,设计一个能够实时监控任务进度、记录失败点并支持任务恢复的系统是非常重要的。以下是针对你的需求的优化设计方案:
-
----
-
-### **三、推荐方案**
-结合你的需求(实时性、任务恢复、多机部署),推荐以下方案:
-1. **任务进度存储**:
-   - 使用 **中心化数据库(SQL Server)** 实时存储任务进度,确保数据可靠性和任务可恢复性。
-2. **任务监控**:
-   - 使用 **WebSocket 实时推送** 或 **Redis Pub/Sub** 实现任务进度的实时监控,避免轮询的开销。
-3. **错误处理**:
-   - 在数据库中记录任务失败时的页码和错误信息,支持从失败点继续执行。
-
----
-
-### **四、示例架构**
-```
-+----------------+       +-----------------+       +----------------+
-|   Worker 1     |       |   Worker 2      |       |   Worker N     |
-| (更新进度)      |       | (更新进度)       |       | (更新进度)      |
-+-------+--------+       +--------+--------+       +--------+-------+
-        |                         |                         |
-        |                         |                         |
-        |                         |                         |
-+-------v-------------------------v-------------------------v-------+
-|                            Redis Pub/Sub                          |
-| (实时广播任务进度)                                                 |
-+-----------------------------------+-------------------------------+
-                                    |
-                                    |
-+-----------------------------------v-------------------------------+
-|                            Master (WebSocket)                     |
-| (接收进度更新,存储到数据库)                                        |
-+-----------------------------------+-------------------------------+
-                                    |
-                                    |
-+-----------------------------------v-------------------------------+
-|                            SQL Server                             |
-| (存储任务进度、失败信息)                                            |
-+-------------------------------------------------------------------+
-```
-
----
-
-### **五、总结**
-- **任务进度存储**:推荐使用中心化数据库(SQL Server)实时存储任务进度,确保数据可靠性和任务可恢复性。
-- **任务监控**:推荐使用 WebSocket 或 Redis Pub/Sub 实现实时进度推送,避免轮询的开销。
-- **错误处理**:在数据库中记录任务失败时的页码和错误信息,支持从失败点继续执行。
-
-通过以上设计,你的系统将具备高实时性、高可靠性和任务可恢复性,能够有效管理多机分布部署的 Worker 节点。
-
-# 架构
-
-### **一、Master 节点的核心职责与实现**
-1. **Redis 服务**  
-   - **角色**:作为任务队列和状态缓存中心。
-   - **关键设计**:
-     - 使用 `List` 或 `Stream` 结构存储待执行的任务(如 `crawler:tasks`)。
-     - 用 `Hash` 存储任务元数据(如任务ID、URL、优先级、重试次数)。
-     - 用 `Set` 或 **布隆过滤器** 实现 URL 去重。
-     - 通过 `Pub/Sub` 广播全局指令(如暂停、终止任务)。
-
-2. **Web 服务**  
-   - **角色**:任务提交、状态监控、用户交互入口。
-   - **实现方案**:
-     - 使用 Flask/Django 提供 REST API,支持任务提交和查询。
-     - 集成可视化面板(如 Grafana)展示实时爬取指标(成功率、QPS)。
-     - 提供任务配置界面(动态调整爬取频率、代理规则)。
-
-3. **SQL Server**  
-   - **角色**:存储结构化数据(爬取结果、任务日志、用户配置)。
-   - **表设计示例**:
-     - `tasks`: 任务ID、状态、创建时间、优先级。
-     - `results`: 清洗后的结构化数据(标题、作者、正文)。
-     - `logs`: 错误日志、重试记录。
-
-4. **文件服务器**  
-   - **角色**:存储非结构化数据(原始HTML、图片、PDF等)。
-   - **实现方案**:
-     - 使用 MinIO 或 AWS S3 兼容的存储服务。
-     - 按 `任务ID/时间戳` 组织目录结构,例如 `/data/task_001/20240401/page.html`。
-
----
-
-### **二、Worker 节点的核心职责与实现**
-1. **任务执行流程**  
-   ```python
-   while True:
-       task = redis.blpop("crawler:tasks", timeout=30)  # 阻塞获取任务
-       if task:
-           try:
-               # 使用 Chrome 渲染动态页面
-               html = chrome_driver.get(task.url)
-               # 用 Trafilatura 解析内容
-               data = trafilatura.extract(html)
-               # 存储到 SQL Server 和文件服务器
-               save_to_sql(data)
-               save_to_fileserver(html)
-               # 标记任务成功
-               redis.hset(f"task:{task.id}", "status", "success")
-           except Exception as e:
-               handle_retry(task, e)  # 重试逻辑
-   ```
-
-2. **关键技术选型**  
-   - **动态渲染**:使用 `Selenium` 或 `Playwright` 控制 Headless Chrome。
-   - **高效解析**:`Trafilatura` 或 `Parsel`(Scrapy 的解析库)。
-   - **反反爬虫**:集成代理池(如 `ProxyMesh`)、随机 User-Agent 轮换。
-
-3. **资源隔离与扩展**  
-   - 每个 Worker 运行在独立 Docker 容器中,避免环境冲突。
-   - 根据 Redis 队列长度动态扩缩容(K8s HPA 或云厂商自动伸缩组)。
-
----
-
-### **三、关键问题解决方案**
-1. **任务去重**  
-   - 在 Redis 中为每个域名维护一个布隆过滤器(`redisbloom` 模块),拦截已爬URL。
-
-2. **分布式锁**  
-   - 使用 `Redlock` 算法保证同一任务不会被多个 Worker 重复执行:
-     ```python
-     lock = redlock.Redlock([redis], retry_count=3)
-     if lock.acquire("task_lock:123", 5000):  # 5秒锁超时
-         process_task()
-     ```
-
-3. **错误处理**  
-   - 设计三级重试策略:
-     - 首次失败:立即重试(网络抖动)。
-     - 二次失败:延迟 5 分钟重试(目标站点限流)。
-     - 三次失败:标记为永久失败,人工介入。
-
-4. **监控告警**  
-   - 指标采集:通过 Prometheus 暴露 Worker 的 CPU/内存使用率、任务吞吐量。
-   - 日志聚合:使用 ELK(Elasticsearch + Logstash + Kibana)集中分析错误日志。
-
----
-
-### **四、架构图示例**
-```
-+----------------+       +-----------------+
-|   User         |       |   Web Dashboard |
-| (提交任务)      +-------> (监控/配置)      |
-+----------------+       +--------+--------+
-                                   |
-                                   | API
-+------------------+        +------v--------+       +----------------+
-|   SQL Server     <--------+   Master      +------->   Redis        |
-| (结构化数据)     |        | (Web服务)      |       | (任务队列/状态) |
-+------------------+        +------+--------+       +--------+-------+
-                                   |                          |
-                                   |                          | 任务分发
-+------------------+        +------v-------------------------v-------+
-|   File Server    <--------+               Workers                  |
-| (原始HTML/文件)  |        | (Scrapy/Chrome/Trafilatura)            |
-+------------------+        +----------------------------------------+
-```
-
----
-
-### **五、部署优化建议**
-1. **Master 高可用**  
-   - Redis 部署哨兵模式或集群。
-   - SQL Server 配置 AlwaysOn 可用性组。
-
-2. **Worker 异构调度**  
-   - 根据任务类型(动态渲染 vs 静态抓取)划分队列,例如:
-     - `queue:dynamic` → 需要 Chrome 的 Worker。
-     - `queue:static` → 仅需 Scrapy 的轻量 Worker。
-
-3. **安全加固**  
-   - 使用 VPN 或 VPC 隔离 Master 和 Worker 的通信。
-   - 对 Redis 和 SQL Server 启用 TLS 加密传输。
-
----
-
-通过以上设计,你的系统将具备高可扩展性和容错性,能有效管理分布式爬虫任务,同时合理分配网络和计算资源。
-
-
-
-# Worker 无状态权衡
-
-User:
-这是否意味着 Workers 是无状态的,也就是说 Workers 机器不能在本地存储有关文件或者数据,即便有许多网页数据、文件、图片等资源,也要存储到中心文件服务中?这样会不会对带宽要求太高了
-
-You:
-
-你的观察非常准确!在分布式系统中,Worker 是否需要完全无状态、是否必须将所有数据实时传输到中心存储,取决于具体的业务需求、数据规模和技术成本。以下是更细致的分析和优化方案:
-
----
-
-### **一、Worker 的“状态”与存储策略权衡**
-#### 1. **完全无状态的优缺点**
-   - **优点**:
-     - **高容错性**:Worker 宕机不影响数据,任务可被其他节点接管。
-     - **易扩展**:新增 Worker 无需考虑本地数据迁移。
-     - **一致性保障**:所有数据集中存储,避免分散管理带来的同步问题。
-   - **缺点**:
-     - **带宽压力**:大量原始数据(HTML、图片)实时上传可能成为瓶颈。
-     - **延迟增加**:高频率的小文件上传可能降低整体吞吐量。
-
-#### 2. **允许部分状态的优化方案**
-   - **混合存储策略**:
-     - **核心数据**(结构化结果、任务状态)必须实时写入中心存储(SQL/Redis)。
-     - **非核心数据**(原始HTML、图片等)可暂存 Worker 本地,异步批量上传。
-   - **示例流程**:
-     ```python
-     # Worker 处理任务时:
-     1. 爬取数据 → 解析结构化结果 → 立即写入 SQL Server(关键数据)
-     2. 原始 HTML/图片 → 暂存本地磁盘 → 定时任务批量压缩上传到文件服务器
-     ```
-
----
-
-### **二、降低带宽压力的关键技术**
-#### 1. **数据压缩与去重**
-   - **压缩**:使用 `gzip` 或 `brotli` 压缩原始 HTML,减少传输体积。
-   - **去重**:
-     - 图片/文件哈希去重:通过 `MD5/SHA1` 计算文件指纹,仅上传唯一文件。
-     - 使用 Redis 或布隆过滤器记录已存储的文件指纹。
-
-#### 2. **增量上传与批量合并**
-   - **增量上传**:仅上传新增或修改的内容(需结合页面版本控制)。
-   - **批量合并**:将多个小文件打包为 `.tar.gz` 或 `.parquet` 格式后上传,减少请求次数。
-   - **示例**:
-     ```bash
-     # Worker 本地操作:
-     tar -czvf batch_001.tar.gz /tmp/html/*.html
-     s3cmd put batch_001.tar.gz s3://your-bucket/
-     ```
-
-#### 3. **边缘存储与 CDN**
-   - **场景**:如果爬取的数据需要被频繁访问(如公开的图片资源)。
-   - **方案**:
-     - 直接上传到 CDN(如 Cloudflare、Akamai)。
-     - 利用边缘节点缓存,减少回源带宽压力。
-
-#### 4. **分布式文件系统**
-   - **场景**:超大规模非结构化数据(如百万级图片)。
-   - **方案**:
-     - 使用 **HDFS** 或 **Ceph** 替代中心化文件服务器。
-     - Worker 本地作为 HDFS 的 DataNode,数据块分布式存储,带宽压力分摊。
-
----
-
-### **三、业务场景驱动的存储决策**
-#### 1. **需优先实时上传的场景**
-   - **敏感数据**:合规性要求立即集中存储(如金融、医疗数据)。
-   - **实时分析需求**:数据需立刻进入数仓供下游分析。
-   - **示例**:
-     ```python
-     # 关键字段实时写入
-     save_to_sql({
-         "title": "Example",
-         "content": "Hello World",
-         "url": "https://example.com"
-     })
-     ```
-
-#### 2. **可接受异步上传的场景**
-   - **原始 HTML 存档**:仅用于事后审计,无需实时访问。
-   - **非关键媒体文件**:图片/视频的展示延迟容忍度高。
-   - **示例**:
-     ```python
-     # 原始 HTML 暂存本地,每小时上传一次
-     with open(f"/tmp/html/{task_id}.html", "w") as f:
-         f.write(html)
-     schedule_hourly_upload()  # 触发批量上传
-     ```
-
----
-
-### **四、容错与备份设计**
-#### 1. **Worker 本地数据的可靠性**
-   - **策略**:
-     - 本地数据视为“临时缓存”,中心存储才是权威数据源。
-     - Worker 宕机后,由其他 Worker 重新执行未确认的任务。
-   - **实现**:
-     - 任务完成后,Worker 发送确认信号给 Master。
-     - Master 若超时未收到确认,将任务重新放回队列。
-
-#### 2. **断点续传与一致性**
-   - **断点续传**:记录已上传的文件偏移量,网络中断后恢复上传。
-   - **一致性保障**:
-     - 使用 `Redis` 记录文件上传状态(如 `file:1234 → uploaded`)。
-     - 采用两阶段提交(2PC)确保事务完整性。
-
----
-
-### **五、成本与性能平衡的架构示例**
-#### 1. **中小规模爬虫(低成本优先)**
-   ```
-   Worker 本地存储(原始HTML) 
-     → 每日批量上传至 NAS/SFTP 
-     → 最终归档到 S3/OSS(低频存储)
-   ```
-
-#### 2. **大规模爬虫(高性能要求)**
-   ```
-   Worker 本地作为 HDFS DataNode
-     → 实时写入 HDFS 分布式文件块
-     → 定期备份到对象存储(如 S3 Glacier)
-   ```
-
-#### 3. **混合云场景(带宽优化)**
-   ```
-   Worker(本地IDC) → 临时存储到本地 NAS
-     → 夜间通过专线同步到云端 S3
-   ```
-
----
-
-### **六、总结**
-- **Worker 无需绝对无状态**:关键数据实时上传,非关键数据可异步处理。
-- **带宽优化是核心**:通过压缩、去重、批量和增量上传降低压力。
-- **业务场景决定设计**:根据数据敏感性、实时性需求和成本选择策略。
-
-通过合理设计,即使 Worker 有部分本地存储,也能在带宽、成本和可靠性之间取得平衡。例如,爬取新闻正文时立即存储结构化结果到 SQL,而原始 HTML 可以每小时批量压缩上传一次,这样对带宽的影响会显著降低。
-
-
-# 设计思路 Master-Worker
-
-在爬虫或分布式系统中,类似“管理者-执行者”这样的架构模式非常常见,通常会用一些更具象化的名称来描述这种分工明确的架构。以下是一些常见的类似称呼:
-
-1. Master-Worker 架构
-Master(管理者):
-
-负责任务的分配、调度和监控。
-
-存储任务状态、爬取规则和数据。
-
-协调多个Worker节点的运行。
-
-Worker(执行者):
-
-负责具体任务的执行,如爬取网页、解析数据。
-
-将结果返回给Master或存储到指定位置。
-
-适用场景:
-
-分布式爬虫系统。
-
-需要动态扩展执行节点的场景。
-
-
-
-以下是一些常用的库和架构:
-
-1. 任务队列 + Worker 模式
-核心思想:
-
-使用任务队列(Message Queue)来分发任务。
-
-Worker 从队列中拉取任务并执行。
-
-常用库:
-
-Celery:
-
-分布式任务队列库,支持异步任务调度和 Worker 管理。
-
-可以与 Redis、RabbitMQ 等消息队列集成。
-
-支持定时任务、任务重试、结果存储等功能。
-
-RQ (Redis Queue):
-
-基于 Redis 的轻量级任务队列。
-
-简单易用,适合小型项目。
-
-Huey:
-
-轻量级任务队列,支持 Redis 或 SQLite 作为后端。
-
-适合小型到中型项目。
-
-架构示例:
-
-Master:负责将任务推送到队列。
-
-Worker:从队列中拉取任务并执行。
-
-适用场景:
-
-需要异步任务处理的系统。
-
-分布式任务调度。

+ 0 - 0
config/pc_configs/pc1.yaml → config/pc_configs/w1.yaml


+ 5 - 0
config/settings.py

@@ -1,3 +1,4 @@
+import os
 from pathlib import Path
 WORK_DIR = Path(__file__).parent.parent.absolute()
 OUTPUT_DIR = WORK_DIR / "output"
@@ -11,3 +12,7 @@ DB_URL = f"sqlite:///{OUTPUT_DIR}/search_results.db"
 
 HTTP_PROXY='http://127.0.0.1:1881'
 HTTPS_PROXY='http://127.0.0.1:1881'
+
+REDIS_URL = 'redis://127.0.0.1:6379/1'
+
+PC_NAME=os.getenv('PC_NAME', 'pc1') 

+ 22 - 16
tests/mytest/t.py

@@ -1,4 +1,7 @@
 import asyncio
+import os
+from pathlib import Path
+import sys
 from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
 import requests
 from bs4 import BeautifulSoup
@@ -6,27 +9,30 @@ import json
 from dotenv import load_dotenv
 from scrapegraphai.graphs import ScriptCreatorGraph
 from scrapegraphai.utils import prettify_exec_info
+import yaml
+import socket
 
-# load_dotenv()
-
-def t_main():
-    url = "https://perinim.github.io/projects"
-    response = requests.get(url)
-    soup = BeautifulSoup(response.content, 'html.parser')
 
-    news_list = []
 
-    for news in soup.find_all('div', class_='news-item'):
-        title = news.find('h2').text.strip()
-        description = news.find('p').text.strip()
-        news_list.append({
-            "title": title,
-            "description": description
-        })
+from mylib.logu import logger
+# load_dotenv()
+config_path = Path(f"config/pc_configs/{'pc1'}.yaml")
+def t_main():
+    
+    if config_path.exists():
+        with open(config_path) as f:
+            config_data = yaml.safe_load(f)
+            logger.info(f"Loaded browser config from {config_path}")
+            logger.info(f"{config_data}")
+    # 获取主机名
+    hostname = socket.gethostname() 
+    ip_address = socket.gethostbyname(hostname)
 
-    print(json.dumps(news_list, indent=4))
+    print("主机名:", hostname)
+    print("IP 地址:", ip_address)
 async def main():
-    t_main()
+    # t_main()
+    print(len(None))
     # s = '''python\nimport requests\nfrom bs4 import BeautifulSoup\nimport json\n\ndef main():\n    url = \"https://perinim.github.io/projects\"\n    response = requests.get(url)\n    soup = BeautifulSoup(response.content, 'html.parser')\n    \n    news_list = []\n    \n    for news in soup.find_all('div', class_='news-item'):\n        title = news.find('h2').text.strip()\n        description = news.find('p').text.strip()\n        news_list.append({\n            \"title\": title,\n            \"description\": description\n        })\n    \n    print(json.dumps(news_list, indent=4))\n\nif __name__ == \"__main__\":\n    main()\n'''
     # print(s)
 if __name__ == "__main__":

+ 7 - 0
worker/celery/app.py

@@ -0,0 +1,7 @@
+from celery import Celery
+from worker.celery import celeryconfig 
+app = Celery(
+    'search_worker',
+    include=['worker.celery.tasks']
+)
+app.config_from_object(celeryconfig)

+ 24 - 0
worker/celery/celeryconfig.py

@@ -0,0 +1,24 @@
+from config.settings import REDIS_URL,DB_URL
+
+celery_result_backend = 'db+' + DB_URL
+# Flower持久化配置
+FLOWER_PERSISTENT = True
+FLOWER_DB = "flower_monitor.db"
+broker_url = REDIS_URL
+
+result_backend = 'db+' + DB_URL
+task_serializer = 'json'
+result_serializer = 'json'
+accept_content = ['json']
+timezone = 'Asia/Shanghai'
+enable_utc = True
+task_track_started = True
+task_default_queue = 'default'
+task_acks_late=True
+worker_prefetch_multiplier=1
+worker_concurrency=1
+worker_send_task_events=True
+worker_pool = 'solo'
+# worker_pool = 'eventlet'
+
+broker_connection_retry_on_startup=True

+ 72 - 0
worker/celery/client.py

@@ -0,0 +1,72 @@
+from worker.celery.app import app
+from worker.celery.models import KeywordTaskModel
+import pandas as pd
+from pathlib import Path
+import logging
+from typing import List
+import sys
+
+# 配置日志
+logging.basicConfig(
+    format='%(asctime)s - %(levelname)s - %(message)s',
+    level=logging.INFO
+)
+logger = logging.getLogger(__name__)
+
+def read_keywords_from_file(file_path: Path) -> List[str]:
+    """读取文件第0列第一行到末尾的内容"""
+    try:
+        # 根据文件类型选择读取方式
+        if file_path.suffix.lower() in ['.xlsx', '.xls']:
+            df = pd.read_excel(file_path, header=0, engine='openpyxl')
+        elif file_path.suffix.lower() in ['.csv', '.tsv']:
+            df = pd.read_csv(file_path, header=0, sep='\t' if file_path.suffix == '.tsv' else ',')
+        else:
+            raise ValueError(f"不支持的格式: {file_path.suffix}")
+            
+        return df.iloc[:, 0].astype(str).tolist()  # 强制转换为字符串类型
+        
+    except Exception as e:
+        logger.error(f"读取文件失败: {str(e)}")
+        raise
+
+def submit_tasks(keywords: List[str]):
+    """提交所有关键词任务"""
+    for keyword in keywords:
+        try:
+            task_data = KeywordTaskModel(keyword=keyword.strip()).model_dump()
+            app.send_task(
+                'worker.celery.tasks.search_single_keyword',
+                args=[task_data],
+            )
+            logger.info(f"已提交任务: {keyword}")
+        except Exception as e:
+            logger.error(f"提交任务失败 [{keyword}]: {str(e)}")
+
+def main(file_path: str):
+    submit_tasks(['Acalypha omissa essential oil'])
+    return 
+    try:
+        path = Path(file_path)
+        if not path.exists():
+            raise FileNotFoundError(f"文件不存在: {path}")
+            
+        keywords = read_keywords_from_file(path)
+        if not keywords:
+            raise ValueError("文件未包含有效关键词")
+            
+        logger.info(f"共读取到 {len(keywords)} 个关键词")
+        logger.info(f"{keywords[:5]}")
+        submit_tasks(keywords[:1])  # 取消注释以启用任务提交
+        logger.info("所有任务已提交完成")
+        
+    except Exception as e:
+        logger.error(f"程序异常终止: {str(e)}")
+        sys.exit(1)
+
+if __name__ == '__main__':
+    if len(sys.argv) != 2:
+        print("使用方法: python -m worker.celery.client <文件路径>")
+        sys.exit(1)
+        
+    main(sys.argv[1])

+ 13 - 0
worker/celery/models.py

@@ -0,0 +1,13 @@
+from pydantic import BaseModel
+
+class KeywordTaskModel(BaseModel):
+    keyword: str
+    max_pages: int = 10
+
+    class Config:
+        json_schema_extra = {
+            "example": {
+                "keyword": "Acampe carinata essential oil",
+                "max_pages": 5
+            }
+        }

+ 30 - 0
worker/celery/readme.md

@@ -0,0 +1,30 @@
+To use this system you would:
+
+ 1 Start Celery worker with: celery -A worker.celery.app worker --loglevel=info --concurrency=1
+ 2 Send tasks using:
+
+脚本方式
+```python
+from worker.celery.models import KeywordTaskModel
+from worker.celery.tasks import search_keyword_task
+task_data = KeywordTaskModel(keyword="Acampe carinata essential oil").model_dump()
+search_keyword_task.delay(task_data)
+```
+命令行方式
+```shell
+celery -A worker.celery.tasks flower --persistent=True --db=".\output\celery\flower_db"
+# 为不同PC启动worker时指定配置
+$env:PC_NAME="w1"; celery -A worker.celery.app worker --hostname=$env:PC_NAME@%h
+$env:PC_NAME="w2"; celery -A worker.celery.app worker --hostname=$env:PC_NAME@%h
+$env:PC_NAME="w3"; celery -A worker.celery.app worker --hostname=$env:PC_NAME@%h
+$env:PC_NAME="w4"; celery -A worker.celery.app worker --hostname=$env:PC_NAME@%h
+$env:PC_NAME="w5"; celery -A worker.celery.app worker --hostname=$env:PC_NAME@%h
+
+
+
+# 单机多worker不同配置(使用不同配置文件)
+
+PC_NAME=worker1_config celery -A worker.celery.app worker --hostname=worker1@%h
+
+PC_NAME=worker2_config celery -A worker.celery.app worker --hostname=worker2@%h
+```

+ 95 - 0
worker/celery/tasks.py

@@ -0,0 +1,95 @@
+import sys
+from worker.celery.app import app
+from worker.search_engine.camoufox_broswer import BrowserConfig, BrowserCore
+from worker.search_engine.google_search import GoogleSearchHandler, search_keyword
+from worker.celery.models import KeywordTaskModel
+import asyncio
+import os
+import json
+import yaml
+from pathlib import Path
+from typing import Optional
+from mylib.logu import logger
+from functools import partial
+
+# 必须显式设置事件循环策略
+if sys.platform == 'win32':
+    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
+else:
+    asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
+
+logger.info(f"{app.conf.result_backend}")
+
+def get_browser_config() -> BrowserConfig:
+    """支持多PC配置和自动创建默认配置"""
+    try:
+        pc_name = os.getenv('PC_NAME')
+        if pc_name:
+            config_path = Path(f"worker/conf/{pc_name}.yaml")
+            config_path.parent.mkdir(parents=True, exist_ok=True)
+            
+            if not config_path.exists():
+                default_config = BrowserConfig(
+                    headless=os.getenv('HEADLESS', 'False').lower() == 'true',
+                    proxy=json.loads(os.getenv('PROXY', '{"server": "http://localhost:1881"}')),
+                    init_url=os.getenv('INIT_URL', "about:blank"),
+                )
+                with open(config_path, 'w') as f:
+                    yaml.safe_dump({'BrowserConfig': default_config.model_dump()}, f)
+                logger.info(f"Created default config at {config_path}")
+
+            with open(config_path) as f:
+                config_data = yaml.safe_load(f)
+                default_config = BrowserConfig(**config_data.get('BrowserConfig', {}))
+        else:
+            default_config = BrowserConfig(
+            headless=os.getenv('HEADLESS', 'False').lower() == 'true',
+            proxy=json.loads(os.getenv('PROXY', '{"server": "http://localhost:1881"}')),
+            init_url=os.getenv('INIT_URL', "about:blank"),
+        )
+        return default_config
+    except Exception as e:
+        logger.error(f"加载浏览器配置失败: {str(e)}")
+        raise
+
+@app.task
+def search_single_keyword(task_data: dict):
+    """同步任务包装器,使用独立事件循环"""
+    keyword = task_data['keyword']
+    config = get_browser_config()
+    
+    # 创建新事件循环并确保彻底清理
+    loop = asyncio.new_event_loop()
+    asyncio.set_event_loop(loop)
+    
+    try:
+        result = loop.run_until_complete(
+            search_keyword_wrapper(keyword, config)
+        )
+        return result.model_dump()
+    finally:
+        # 显式清理Playwright资源
+        loop.run_until_complete(loop.shutdown_asyncgens())
+        loop.close()
+        logger.info(f"清理完成 {keyword}")
+        # Windows需要额外清理
+        if sys.platform == 'win32':
+            import gc
+            gc.collect()
+
+async def search_keyword_wrapper(keyword: str, config: BrowserConfig):
+    """异步执行的核心逻辑"""
+    # 禁用单例模式,每次创建新实例
+    core = BrowserCore(config=config)
+    await core.initialize()
+    try:
+        search_handler = GoogleSearchHandler(core.page)
+        return await search_handler.process_keyword(keyword)
+    finally:
+        await core.close()
+
+def main():
+    browser_config = get_browser_config()
+
+if __name__ == "__main__":
+    main()

+ 8 - 0
worker/conf/w1.yaml

@@ -0,0 +1,8 @@
+BrowserConfig:
+  headless: false
+  proxy:
+    server: "http://localhost:9350"
+  init_url: "about:blank"
+  constrains:
+    max_width: 1920
+    max_height: 1200

+ 9 - 0
worker/conf/w2.yaml

@@ -0,0 +1,9 @@
+BrowserConfig:
+  headless: false
+  proxy:
+    server: "http://localhost:9352"
+  init_url: "about:blank"
+  constrains:
+    max_width: 1920
+    max_height: 1200
+

+ 9 - 0
worker/conf/w3.yaml

@@ -0,0 +1,9 @@
+BrowserConfig:
+  headless: false
+  proxy:
+    server: "http://localhost:9354"
+  init_url: "about:blank"
+  constrains:
+    max_width: 1920
+    max_height: 1200
+

+ 9 - 0
worker/conf/w4.yaml

@@ -0,0 +1,9 @@
+BrowserConfig:
+  headless: false
+  proxy:
+    server: "http://localhost:9356"
+  init_url: "about:blank"
+  constrains:
+    max_width: 1920
+    max_height: 1200
+

+ 9 - 0
worker/conf/w5.yaml

@@ -0,0 +1,9 @@
+BrowserConfig:
+  headless: false
+  proxy:
+    server: "http://localhost:9358"
+  init_url: "about:blank"
+  constrains:
+    max_width: 1920
+    max_height: 1200
+

+ 13 - 8
worker/search_engine/google_search.py

@@ -53,7 +53,7 @@ class GoogleSearchHandler():
             
         return result_ele
 
-    async def process_keyword(self, keyword: str):
+    async def process_keyword(self, keyword: str, max_result_items: int = 200):
         key_model = self.db_manager.get_keyword_task(keyword)
         if key_model:
             logger.info(f"关键词任务已完成,跳过处理: {keyword}")
@@ -71,6 +71,9 @@ class GoogleSearchHandler():
             result_ele = await self._process_single_page(keyword, html_dir)
             # 要 if else 0 语法是为了健壮,避免 results 可能为 None,引发异常
             search_result_item_count += len(result_ele.results) if result_ele.results else 0
+            if search_result_item_count > max_result_items:
+                logger.info(f"关键词 {keyword} 单页结果数量超过 {max_result_items} ,跳过处理下一页")
+                break
             if result_ele.next_page_url:
                 await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
                 # await asyncio.sleep(1)
@@ -126,8 +129,6 @@ class GoogleSearchHandler():
         result_list = search_div.xpath('//*[@data-snc]')
         logger.info(f"当前页结果数量: {len(result_list)}")
         
-        res.total_count = len(result_list)
-        
         for result_item in result_list:
             if len(result_item.children) < 2:
                 continue
@@ -147,6 +148,13 @@ class GoogleSearchHandler():
                 res.results.append(result)
         return res
 
+async def search_keyword(keyword, max_result_items=200,  config: BrowserConfig = BrowserConfig()):
+    core = await BrowserCore.get_instance(config = config)
+    search_handler = GoogleSearchHandler(core.page)
+    kw = await search_handler.process_keyword(keyword, max_result_items=max_result_items)
+    await core.close()
+    return kw
+
 async def aio_main(config: BrowserConfig = BrowserConfig()):
     try:
         core = await BrowserCore.get_instance(config)
@@ -160,9 +168,6 @@ async def aio_main(config: BrowserConfig = BrowserConfig()):
             # 'Acamptopappus shockleyi essential oil'
         ]
         # logger.info(f"{search_handler.page.url}")
-        # await search_handler.page.goto('https://www.google.com/search?q=Acamptopappus+shockleyi+essential+oil&sca_esv=115fbc7a5c2d88aa&ei=3GGRZ7OqELqhseMP0cezyA8&start=10&sa=N&sstk=Af40H4VqYvsegeoLl_BGSRs4ANGcOj-yJdvukS8s8Z7OeMF0-7019-ly9KY_ekhOXUB-HniMqLnkuYZsnBsbUKnLm61J0cbQwCdAoA&ved=2ahUKEwiziO3boYqLAxW6UGwGHdHjDPkQ8NMDegQIBxAW')
-        # return 
-        # await search_handler.goto_home_page()
         for keyword in keywords:
             await search_handler.process_keyword(keyword)
         while True:
@@ -170,7 +175,6 @@ async def aio_main(config: BrowserConfig = BrowserConfig()):
             
     except Exception as e:
         logger.error(f"失败: {str(e)}")
-        raise
     finally:
         # 确保最终关闭浏览器
         await core.close()
@@ -199,7 +203,8 @@ def analyze():
 
 def main():
     # analyze()
-    asyncio.run(aio_main())
+    # asyncio.run(aio_main())
+    asyncio.run(search_keyword('Acampe carinata essential oil', BrowserConfig()))
 
 if __name__ == "__main__":
     main()