mrh 10 сар өмнө
parent
commit
3de3e57e9c

+ 1 - 0
.gitignore

@@ -4,3 +4,4 @@ __pycache__
 .vscode
 .aider*
 .env
+.pytest_cache

+ 339 - 0
architecture.md

@@ -0,0 +1,339 @@
+# 架构
+
+### **一、Master 节点的核心职责与实现**
+1. **Redis 服务**  
+   - **角色**:作为任务队列和状态缓存中心。
+   - **关键设计**:
+     - 使用 `List` 或 `Stream` 结构存储待执行的任务(如 `crawler:tasks`)。
+     - 用 `Hash` 存储任务元数据(如任务ID、URL、优先级、重试次数)。
+     - 用 `Set` 或 **布隆过滤器** 实现 URL 去重。
+     - 通过 `Pub/Sub` 广播全局指令(如暂停、终止任务)。
+
+2. **Web 服务**  
+   - **角色**:任务提交、状态监控、用户交互入口。
+   - **实现方案**:
+     - 使用 Flask/Django 提供 REST API,支持任务提交和查询。
+     - 集成可视化面板(如 Grafana)展示实时爬取指标(成功率、QPS)。
+     - 提供任务配置界面(动态调整爬取频率、代理规则)。
+
+3. **SQL Server**  
+   - **角色**:存储结构化数据(爬取结果、任务日志、用户配置)。
+   - **表设计示例**:
+     - `tasks`: 任务ID、状态、创建时间、优先级。
+     - `results`: 清洗后的结构化数据(标题、作者、正文)。
+     - `logs`: 错误日志、重试记录。
+
+4. **文件服务器**  
+   - **角色**:存储非结构化数据(原始HTML、图片、PDF等)。
+   - **实现方案**:
+     - 使用 MinIO 或 AWS S3 兼容的存储服务。
+     - 按 `任务ID/时间戳` 组织目录结构,例如 `/data/task_001/20240401/page.html`。
+
+---
+
+### **二、Worker 节点的核心职责与实现**
+1. **任务执行流程**  
+   ```python
+   while True:
+       task = redis.blpop("crawler:tasks", timeout=30)  # 阻塞获取任务
+       if task:
+           try:
+               # 使用 Chrome 渲染动态页面
+               html = chrome_driver.get(task.url)
+               # 用 Trafilatura 解析内容
+               data = trafilatura.extract(html)
+               # 存储到 SQL Server 和文件服务器
+               save_to_sql(data)
+               save_to_fileserver(html)
+               # 标记任务成功
+               redis.hset(f"task:{task.id}", "status", "success")
+           except Exception as e:
+               handle_retry(task, e)  # 重试逻辑
+   ```
+
+2. **关键技术选型**  
+   - **动态渲染**:使用 `Selenium` 或 `Playwright` 控制 Headless Chrome。
+   - **高效解析**:`Trafilatura` 或 `Parsel`(Scrapy 的解析库)。
+   - **反反爬虫**:集成代理池(如 `ProxyMesh`)、随机 User-Agent 轮换。
+
+3. **资源隔离与扩展**  
+   - 每个 Worker 运行在独立 Docker 容器中,避免环境冲突。
+   - 根据 Redis 队列长度动态扩缩容(K8s HPA 或云厂商自动伸缩组)。
+
+---
+
+### **三、关键问题解决方案**
+1. **任务去重**  
+   - 在 Redis 中为每个域名维护一个布隆过滤器(`redisbloom` 模块),拦截已爬URL。
+
+2. **分布式锁**  
+   - 使用 `Redlock` 算法保证同一任务不会被多个 Worker 重复执行:
+     ```python
+     lock = redlock.Redlock([redis], retry_count=3)
+     if lock.acquire("task_lock:123", 5000):  # 5秒锁超时
+         process_task()
+     ```
+
+3. **错误处理**  
+   - 设计三级重试策略:
+     - 首次失败:立即重试(网络抖动)。
+     - 二次失败:延迟 5 分钟重试(目标站点限流)。
+     - 三次失败:标记为永久失败,人工介入。
+
+4. **监控告警**  
+   - 指标采集:通过 Prometheus 暴露 Worker 的 CPU/内存使用率、任务吞吐量。
+   - 日志聚合:使用 ELK(Elasticsearch + Logstash + Kibana)集中分析错误日志。
+
+---
+
+### **四、架构图示例**
+```
++----------------+       +-----------------+
+|   User         |       |   Web Dashboard |
+| (提交任务)      +-------> (监控/配置)      |
++----------------+       +--------+--------+
+                                   |
+                                   | API
++------------------+        +------v--------+       +----------------+
+|   SQL Server     <--------+   Master      +------->   Redis        |
+| (结构化数据)     |        | (Web服务)      |       | (任务队列/状态) |
++------------------+        +------+--------+       +--------+-------+
+                                   |                          |
+                                   |                          | 任务分发
++------------------+        +------v-------------------------v-------+
+|   File Server    <--------+               Workers                  |
+| (原始HTML/文件)  |        | (Scrapy/Chrome/Trafilatura)            |
++------------------+        +----------------------------------------+
+```
+
+---
+
+### **五、部署优化建议**
+1. **Master 高可用**  
+   - Redis 部署哨兵模式或集群。
+   - SQL Server 配置 AlwaysOn 可用性组。
+
+2. **Worker 异构调度**  
+   - 根据任务类型(动态渲染 vs 静态抓取)划分队列,例如:
+     - `queue:dynamic` → 需要 Chrome 的 Worker。
+     - `queue:static` → 仅需 Scrapy 的轻量 Worker。
+
+3. **安全加固**  
+   - 使用 VPN 或 VPC 隔离 Master 和 Worker 的通信。
+   - 对 Redis 和 SQL Server 启用 TLS 加密传输。
+
+---
+
+通过以上设计,你的系统将具备高可扩展性和容错性,能有效管理分布式爬虫任务,同时合理分配网络和计算资源。
+
+
+
+# Worker 无状态权衡
+
+User:
+这是否意味着 Workers 是无状态的,也就是说 Workers 机器不能在本地存储有关文件或者数据,即便有许多网页数据、文件、图片等资源,也要存储到中心文件服务中?这样会不会对带宽要求太高了
+
+You:
+
+你的观察非常准确!在分布式系统中,Worker 是否需要完全无状态、是否必须将所有数据实时传输到中心存储,取决于具体的业务需求、数据规模和技术成本。以下是更细致的分析和优化方案:
+
+---
+
+### **一、Worker 的“状态”与存储策略权衡**
+#### 1. **完全无状态的优缺点**
+   - **优点**:
+     - **高容错性**:Worker 宕机不影响数据,任务可被其他节点接管。
+     - **易扩展**:新增 Worker 无需考虑本地数据迁移。
+     - **一致性保障**:所有数据集中存储,避免分散管理带来的同步问题。
+   - **缺点**:
+     - **带宽压力**:大量原始数据(HTML、图片)实时上传可能成为瓶颈。
+     - **延迟增加**:高频率的小文件上传可能降低整体吞吐量。
+
+#### 2. **允许部分状态的优化方案**
+   - **混合存储策略**:
+     - **核心数据**(结构化结果、任务状态)必须实时写入中心存储(SQL/Redis)。
+     - **非核心数据**(原始HTML、图片等)可暂存 Worker 本地,异步批量上传。
+   - **示例流程**:
+     ```python
+     # Worker 处理任务时:
+     1. 爬取数据 → 解析结构化结果 → 立即写入 SQL Server(关键数据)
+     2. 原始 HTML/图片 → 暂存本地磁盘 → 定时任务批量压缩上传到文件服务器
+     ```
+
+---
+
+### **二、降低带宽压力的关键技术**
+#### 1. **数据压缩与去重**
+   - **压缩**:使用 `gzip` 或 `brotli` 压缩原始 HTML,减少传输体积。
+   - **去重**:
+     - 图片/文件哈希去重:通过 `MD5/SHA1` 计算文件指纹,仅上传唯一文件。
+     - 使用 Redis 或布隆过滤器记录已存储的文件指纹。
+
+#### 2. **增量上传与批量合并**
+   - **增量上传**:仅上传新增或修改的内容(需结合页面版本控制)。
+   - **批量合并**:将多个小文件打包为 `.tar.gz` 或 `.parquet` 格式后上传,减少请求次数。
+   - **示例**:
+     ```bash
+     # Worker 本地操作:
+     tar -czvf batch_001.tar.gz /tmp/html/*.html
+     s3cmd put batch_001.tar.gz s3://your-bucket/
+     ```
+
+#### 3. **边缘存储与 CDN**
+   - **场景**:如果爬取的数据需要被频繁访问(如公开的图片资源)。
+   - **方案**:
+     - 直接上传到 CDN(如 Cloudflare、Akamai)。
+     - 利用边缘节点缓存,减少回源带宽压力。
+
+#### 4. **分布式文件系统**
+   - **场景**:超大规模非结构化数据(如百万级图片)。
+   - **方案**:
+     - 使用 **HDFS** 或 **Ceph** 替代中心化文件服务器。
+     - Worker 本地作为 HDFS 的 DataNode,数据块分布式存储,带宽压力分摊。
+
+---
+
+### **三、业务场景驱动的存储决策**
+#### 1. **需优先实时上传的场景**
+   - **敏感数据**:合规性要求立即集中存储(如金融、医疗数据)。
+   - **实时分析需求**:数据需立刻进入数仓供下游分析。
+   - **示例**:
+     ```python
+     # 关键字段实时写入
+     save_to_sql({
+         "title": "Example",
+         "content": "Hello World",
+         "url": "https://example.com"
+     })
+     ```
+
+#### 2. **可接受异步上传的场景**
+   - **原始 HTML 存档**:仅用于事后审计,无需实时访问。
+   - **非关键媒体文件**:图片/视频的展示延迟容忍度高。
+   - **示例**:
+     ```python
+     # 原始 HTML 暂存本地,每小时上传一次
+     with open(f"/tmp/html/{task_id}.html", "w") as f:
+         f.write(html)
+     schedule_hourly_upload()  # 触发批量上传
+     ```
+
+---
+
+### **四、容错与备份设计**
+#### 1. **Worker 本地数据的可靠性**
+   - **策略**:
+     - 本地数据视为“临时缓存”,中心存储才是权威数据源。
+     - Worker 宕机后,由其他 Worker 重新执行未确认的任务。
+   - **实现**:
+     - 任务完成后,Worker 发送确认信号给 Master。
+     - Master 若超时未收到确认,将任务重新放回队列。
+
+#### 2. **断点续传与一致性**
+   - **断点续传**:记录已上传的文件偏移量,网络中断后恢复上传。
+   - **一致性保障**:
+     - 使用 `Redis` 记录文件上传状态(如 `file:1234 → uploaded`)。
+     - 采用两阶段提交(2PC)确保事务完整性。
+
+---
+
+### **五、成本与性能平衡的架构示例**
+#### 1. **中小规模爬虫(低成本优先)**
+   ```
+   Worker 本地存储(原始HTML) 
+     → 每日批量上传至 NAS/SFTP 
+     → 最终归档到 S3/OSS(低频存储)
+   ```
+
+#### 2. **大规模爬虫(高性能要求)**
+   ```
+   Worker 本地作为 HDFS DataNode
+     → 实时写入 HDFS 分布式文件块
+     → 定期备份到对象存储(如 S3 Glacier)
+   ```
+
+#### 3. **混合云场景(带宽优化)**
+   ```
+   Worker(本地IDC) → 临时存储到本地 NAS
+     → 夜间通过专线同步到云端 S3
+   ```
+
+---
+
+### **六、总结**
+- **Worker 无需绝对无状态**:关键数据实时上传,非关键数据可异步处理。
+- **带宽优化是核心**:通过压缩、去重、批量和增量上传降低压力。
+- **业务场景决定设计**:根据数据敏感性、实时性需求和成本选择策略。
+
+通过合理设计,即使 Worker 有部分本地存储,也能在带宽、成本和可靠性之间取得平衡。例如,爬取新闻正文时立即存储结构化结果到 SQL,而原始 HTML 可以每小时批量压缩上传一次,这样对带宽的影响会显著降低。
+
+
+# 设计思路 Master-Worker
+
+在爬虫或分布式系统中,类似“管理者-执行者”这样的架构模式非常常见,通常会用一些更具象化的名称来描述这种分工明确的架构。以下是一些常见的类似称呼:
+
+1. Master-Worker 架构
+Master(管理者):
+
+负责任务的分配、调度和监控。
+
+存储任务状态、爬取规则和数据。
+
+协调多个Worker节点的运行。
+
+Worker(执行者):
+
+负责具体任务的执行,如爬取网页、解析数据。
+
+将结果返回给Master或存储到指定位置。
+
+适用场景:
+
+分布式爬虫系统。
+
+需要动态扩展执行节点的场景。
+
+
+
+以下是一些常用的库和架构:
+
+1. 任务队列 + Worker 模式
+核心思想:
+
+使用任务队列(Message Queue)来分发任务。
+
+Worker 从队列中拉取任务并执行。
+
+常用库:
+
+Celery:
+
+分布式任务队列库,支持异步任务调度和 Worker 管理。
+
+可以与 Redis、RabbitMQ 等消息队列集成。
+
+支持定时任务、任务重试、结果存储等功能。
+
+RQ (Redis Queue):
+
+基于 Redis 的轻量级任务队列。
+
+简单易用,适合小型项目。
+
+Huey:
+
+轻量级任务队列,支持 Redis 或 SQLite 作为后端。
+
+适合小型到中型项目。
+
+架构示例:
+
+Master:负责将任务推送到队列。
+
+Worker:从队列中拉取任务并执行。
+
+适用场景:
+
+需要异步任务处理的系统。
+
+分布式任务调度。

+ 0 - 0
mylib/conf/9321.ini → config/conf/9321.ini


+ 0 - 0
mylib/settings.py → config/settings.py


+ 1 - 1
database/excel_import.py

@@ -1,7 +1,7 @@
 """读取Excel文件"""
 import pandas as pd
 from sqlite3 import connect
-from mylib.settings import OUTPUT_DIR, WORK_DIR
+from config.settings import OUTPUT_DIR, WORK_DIR
 from datetime import datetime
 from sqlmodel import Field, SQLModel, create_engine, Session, select, func
 from sqlalchemy import text

+ 1 - 1
database/search_model.py

@@ -1,5 +1,5 @@
 from sqlite3 import connect
-from mylib.settings import OUTPUT_DIR, WORK_DIR
+from config.settings import OUTPUT_DIR, WORK_DIR
 from datetime import datetime
 from sqlmodel import Field, SQLModel, create_engine, Session, select
 from sqlalchemy import text  # Add this import

+ 1 - 1
database/sqlite_engine.py

@@ -1,5 +1,5 @@
 from sqlite3 import connect
-from mylib.settings import OUTPUT_DIR, WORK_DIR
+from config.settings import OUTPUT_DIR, WORK_DIR
 from datetime import datetime
 from sqlmodel import Field, SQLModel, create_engine, Session, select
 from sqlalchemy import text  # Add this import

+ 0 - 0
article.py → demo/article.py


+ 1 - 1
crawl_multi.py → demo/crawl_multi.py

@@ -8,7 +8,7 @@ from dotenv import load_dotenv
 from crawl4ai import *
 from search_keyward import test_dir_links_not_local
 from mylib.base import ensure_output_dir, load_from_pickle, save_to_file
-from mylib.settings import *
+from config.settings import *
 from pathlib import Path
 import markdown
 from docx import Document

+ 0 - 0
drission_page_process.py → demo/drission_page_process.py


+ 0 - 0
get_article_info.py → demo/get_article_info.py


+ 1 - 1
search_keyward.py → demo/search_keyward.py

@@ -10,7 +10,7 @@ from mylib.base import (replace_space, save_to_file, save_all_result,
                         save_base64_to_file,browser_config)
 from mylib.drission_page import load_chrome_from_ini
 from database.excel_import import ExcelDatabaseManager,KeywordModel
-from mylib.settings import GOOGLE_SEARCH_DIR
+from config.settings import GOOGLE_SEARCH_DIR
 from mylib.crawl_lib_func import filter_links,filter_local_domain
 
 page = load_chrome_from_ini()

+ 1 - 1
mylib/drission_page.py

@@ -1,7 +1,7 @@
 import time
 from DrissionPage import Chromium, ChromiumOptions, ChromiumPage
 from pathlib import Path
-from mylib.settings import OUTPUT_DIR, WORK_DIR, CONFIG_DIR
+from config.settings import OUTPUT_DIR, WORK_DIR, CONFIG_DIR
 from mylib.random_ua import get_random_user_agent
 from fake_useragent import UserAgent
 

+ 1 - 1
mylib/logging_config.py

@@ -2,7 +2,7 @@ import logging
 import sys
 import os
 from pathlib import Path
-from mylib.settings import OUTPUT_DIR, WORK_DIR
+from config.settings import OUTPUT_DIR, WORK_DIR
 
 def _get_relative_path(pathname):
     """将绝对路径转换为相对于项目根目录的相对路径"""

+ 1 - 1
mylib/search_manager.py

@@ -20,7 +20,7 @@ from database.excel_import import ExcelDatabaseManager,KeywordModel
 from database.sqlite_engine import create_db_and_tables, drop_table
 from mylib.drission_page import load_chrome_from_ini,load_random_ua_chrome
 from lxml import html
-from mylib.settings import GOOGLE_SEARCH_DIR
+from config.settings import GOOGLE_SEARCH_DIR
 
 class SearchManager:
     def __init__(self, page: ChromiumPage):

+ 47 - 0
readme.md

@@ -0,0 +1,47 @@
+
+
+
+
+# 工具
+
+## 架构相关
+3.3k ⭐  智能打开 s3 、hdfs 、 sftp 、 ftp 、 local 文件系统
+https://github.com/piskvorky/smart_open
+
+2.2k ⭐  PyFilesystem2 是一个抽象的文件系统接口
+https://github.com/PyFilesystem/pyfilesystem2
+
+
+## 爬虫和解析
+6.6k ⭐  爬虫框架大全
+https://github.com/BruceDone/awesome-crawler
+
+
+
+
+330 ⭐  awesome 网页解析器数据提取大全
+https://github.com/kimtth/awesome-azure-openai-llm/blob/9b16663bb4e38bc8760f3f274b92dfcca0ada34a/section/app.md
+关键词: https://github.com/search?q=Trafilatura+awesome++language%3AMarkdown&type=code&l=Markdown
+
+34.9k ⭐ markitdown
+https://github.com/microsoft/markitdown
+
+22k ⭐ firecrawl AI 抓取干净结构化的页面 
+https://github.com/mendableai/firecrawl
+
+readerLM-v2
+
+3.1k⭐ 网页解析器
+https://github.com/adbar/trafilatura
+各类工具的评估结果,我们可以看看哪些工具最强
+https://trafilatura.readthedocs.io/en/latest/evaluation.html#results-2022-05-18
+
+
+339 ⭐ 文章提取器,这是一个论文和评估基准
+https://github.com/scrapinghub/article-extraction-benchmark
+
+1.4k ⭐  快如闪电的解析器,比 bs4 快240倍
+https://github.com/D4Vinci/Scrapling
+机器人检测的网站,测试用
+https://www.browserscan.net/bot-detection
+"https://bot.sannysoft.com/"

+ 0 - 0
mytest/docling_t.py → tests/mytest/docling_t.py


+ 0 - 0
mytest/get_suport_ua.py → tests/mytest/get_suport_ua.py


+ 0 - 0
mytest/google_search_api.py → tests/mytest/google_search_api.py


+ 0 - 0
mytest/googlesearch_t.py → tests/mytest/googlesearch_t.py


+ 0 - 0
mytest/news_paper_t.py → tests/mytest/news_paper_t.py


+ 0 - 0
mytest/pandoc_t.py → tests/mytest/pandoc_t.py


+ 0 - 0
mytest/playwright_run_path.py → tests/mytest/playwright_run_path.py


+ 0 - 0
mytest/playwright_t.py → tests/mytest/playwright_t.py


+ 33 - 0
tests/mytest/scrapling_t.py

@@ -0,0 +1,33 @@
+"""
+I only made this example to show how Scrapling features can be used to scrape a website without writing any selector
+    so this script doesn't depend on the website structure.
+"""
+
+import requests
+
+from scrapling import Adaptor
+def stackoverflow_demo():
+    response = requests.get('https://stackoverflow.com/questions/tagged/web-scraping?sort=MostVotes&filters=NoAcceptedAnswer&edited=true&pagesize=50&page=2')
+    page = Adaptor(response.text, url=response.url)
+    # First we will extract the first question title and its author based on the text content
+    first_question_title = page.find_by_text('Run Selenium Python Script on Remote Server')
+    first_question_author = page.find_by_text('Ryan')
+    # because this page changes a lot
+    if first_question_title and first_question_author:
+        # If you want you can extract other questions tags like below
+        first_question = first_question_title.find_ancestor(
+            lambda ancestor: ancestor.attrib.get('id') and 'question-summary' in ancestor.attrib.get('id')
+        )
+        rest_of_questions = first_question.find_similar()
+        # But since nothing to rely on to extract other titles/authors from these elements without CSS/XPath selectors due to the website nature
+        # We will get all the rest of the titles/authors in the page depending on the first title and the first author we got above as a starting point
+        for i, (title, author) in enumerate(zip(first_question_title.find_similar(), first_question_author.find_similar()), start=1):
+            print(i, title.text, author.text)
+
+def google_search_demo():
+    search_key = "python"
+def main():
+    google_search_demo()
+
+if __name__ == "__main__":
+    main()

+ 0 - 0
mytest/t.py → tests/mytest/t.py


+ 0 - 0
mytest/test_fake_ua.py → tests/mytest/test_fake_ua.py


+ 15 - 0
tests/mytest/trafilatura_html.py

@@ -0,0 +1,15 @@
+from trafilatura import fetch_url, extract
+from pathlib import Path
+from mylib.base import save_to_file
+file = Path(r'K:\code\upwork\zhang_crawl_bio\output\google_search\Acalypha manniana essential oil\10.html')
+# downloaded = fetch_url()
+downloaded = file.read_text(encoding='utf-8')
+result = extract(downloaded, output_format="xml", with_metadata=True)
+print(f"result: {result}")
+# define output format :"csv", "html", "json", "markdown", "txt", "xml", and "xmltei".
+output_format_list = ["csv", "html", "json", "markdown", "txt", "xml", "xmltei"]
+for output_format in output_format_list:
+    result = extract(downloaded, output_format=output_format, with_metadata=True)
+    save_path = save_to_file(result, 'ext' / file.with_suffix(f'.{output_format}'))
+    print(f"save_path: {save_path}")
+# save_to_file(result, file.with_suffix('.xml'))

+ 0 - 0
mytest/xpath_search.py → tests/mytest/xpath_search.py