Parcourir la source

完成 crawl4ai 使用自定义浏览器批量抓取

mrh il y a 1 an
Parent
commit
c08a17c95f

+ 3 - 3
mylib/drission_page.py

@@ -19,7 +19,7 @@ def genarate_chrome_ini(address="localhost:9321"):
     # chrome_options.incognito(True)
     path = chrome_options.save(CONFIG_DIR / f'{port}.ini')
     return path
-def load_chrome_from_ini(path=CONFIG_DIR / '9321.ini', headless=False, proxy=None, browser_path=None, no_imgs=True):
+def load_chrome_from_ini(path=CONFIG_DIR / '9321.ini', headless=False, proxy=None, browser_path=None, no_imgs=True, auto_port=True):
     chrome_options = ChromiumOptions(ini_path=path)
     if browser_path:
         chrome_options.set_browser_path(browser_path)
@@ -28,7 +28,7 @@ def load_chrome_from_ini(path=CONFIG_DIR / '9321.ini', headless=False, proxy=Non
     # 如果存在代理环境变量
     elif 'HTTP_PROXY' in os.environ:
         chrome_options.set_proxy(os.environ['HTTP_PROXY'])
-    chrome_options.auto_port(True)
+    chrome_options.auto_port(auto_port)
     chrome_options.no_imgs(no_imgs)
     logger.info(f"proxy {proxy}")
     page = ChromiumPage(chrome_options)
@@ -78,7 +78,7 @@ def test_random_ua_chrome():
 
 def test_normal_chrome():
     # genarate_chrome_ini()
-    page = load_chrome_from_ini(proxy='http://localhost:9356')
+    page = load_chrome_from_ini(proxy='http://localhost:1881')
     tab = page.latest_tab
     keyword = "Acalypha rivularis essential oil"
     url = f"https://www.google.com/search?q={keyword}"

+ 4 - 4
poetry.lock

@@ -836,14 +836,14 @@ files = [
 
 [[package]]
 name = "crawl4ai"
-version = "0.4.248"
+version = "0.4.3b3"
 description = "🚀🤖 Crawl4AI: Open-source LLM Friendly Web Crawler & scraper"
 optional = false
 python-versions = ">=3.9"
 groups = ["main"]
 files = [
-    {file = "Crawl4AI-0.4.248-py3-none-any.whl", hash = "sha256:b98eed27d0215cfe69faa59e922b04e131c8683a07def08097075140ec71a549"},
-    {file = "crawl4ai-0.4.248.tar.gz", hash = "sha256:ae0da8d4bbde09a9985a9f5797ed351172adc82df1266cf55ef10526abf66e46"},
+    {file = "Crawl4AI-0.4.3b3-py3-none-any.whl", hash = "sha256:125decb0a1949f236879ca873204590db9818be540cef0e9f16444a3d9ad50c3"},
+    {file = "crawl4ai-0.4.3b3.tar.gz", hash = "sha256:5f5b89fbdcf58a7bd450dd7dd1b08a3fa9eb082936588f04947a6867b4942c87"},
 ]
 
 [package.dependencies]
@@ -6247,4 +6247,4 @@ cffi = ["cffi (>=1.11)"]
 [metadata]
 lock-version = "2.1"
 python-versions = "3.12"
-content-hash = "a12e5cd950b04e0b979696be00ec81b04b6baff82ac60c4f9d9bb74017c5dabf"
+content-hash = "8e7e98071910481d87f2c698eade5544d8fa0c84c8b7b64120a243ccbea6dd11"

+ 2 - 1
pyproject.toml

@@ -12,7 +12,7 @@ dependencies = [
     "scrapling (>=0.2.93,<0.3.0)",
     "sqlmodel (>=0.0.22,<0.0.23)",
     "drissionpage (>=4.1.0.17,<5.0.0.0)",
-    "crawl4ai (>=0.4.248,<0.5.0)",
+    "crawl4ai (==0.4.3b3)",
     "redis (>=5.2.1,<6.0.0)",
     "celery (>=5.4.0,<6.0.0)",
     "flower (>=2.0.1,<3.0.0)",
@@ -20,6 +20,7 @@ dependencies = [
     "fastapi (>=0.115.10,<0.116.0)",
     "cachetools (>=5.5.2,<6.0.0)",
     "uvicorn (>=0.34.0,<0.35.0)",
+    "playwright (>=1.50.0,<2.0.0)",
 ]
 
 

Fichier diff supprimé car celui-ci est trop grand
+ 62 - 0
tests/mytest/crawl_t.py


+ 6 - 0
ui/backend/routers/worker.py

@@ -180,6 +180,12 @@ async def ctrl_worker(request: StartupRequest):
             select_proxy=request.select_proxy
         )
         return {"status": "success", "res": res}
+    elif request.action == "pause":
+        await celery_worker.pause_worker(request.worker_name)
+        return {"status": "success"}
+    elif request.action == "resume":
+        await celery_worker.resume_worker(request.worker_name)
+        return {"status": "success"}
     else:
         logger.error(f"Invalid action: {request.action}")
         raise HTTPException(status_code=400, detail=f"Invalid action: {request.action}")

+ 25 - 1
ui/backend/src/services/celery_worker.py

@@ -45,7 +45,7 @@ class CrawlBrowserConfig(BaseModel):
     proxy: str | None = None
 
 class CrawlTaskConfig(BaseModel):
-    browser_config: Optional[CrawlBrowserConfig] = None
+    browser_config: Optional[SearchBrowserConfig] = None
     overwrite: Optional[bool] = False
     dry_run: Optional[bool] = False
     proxy_pool_url: Optional[str] = None
@@ -97,6 +97,7 @@ class CeleryWorker:
                 
                 workers_data = worker_status.get("workers", {}).get("data", [])
                 
+                logger.info(f"wait ... worker {len(workers_data)}")
                 # Check if our worker is in the list
                 for worker in workers_data:
                     if worker["hostname"].startswith(f"{name}@"):
@@ -179,6 +180,28 @@ class CeleryWorker:
             except psutil.TimeoutExpired:
                 await asyncio.to_thread(proc.kill)               # 异步强制杀死
                 logger.warning(f"{worker_model} 进程 {worker_model.pid} 强制终止")
+
+    async def pause_worker(self, name: str):
+        """
+        暂停指定worker的任务执行
+        (deprecated) pause 和 resume 只能阻止 Worker 获取新任务,不能停止正在执行的任务
+        """
+        worker_model = self.workers_model.get(name)
+        if not worker_model:
+            raise ValueError(f"Invalid worker name: {name}")
+        self.celery_app.control.broadcast('pause', destination=[worker_model.queue_name])
+        logger.info(f"已发送暂停指令到 {worker_model.queue_name}")
+
+    async def resume_worker(self, name: str):
+        """
+        恢复指定worker的任务执行
+        (deprecated)  pause 和 resume 只能阻止 Worker 获取新任务,不能停止正在执行的任务
+        """
+        worker_model = self.workers_model.get(name)
+        if not worker_model:
+            raise ValueError(f"Invalid worker name: {name}")
+        self.celery_app.control.broadcast('resume', destination=[worker_model.queue_name])
+        logger.info(f"已发送恢复指令到 {worker_model.queue_name}")
     
     async def check_worker_status(self) -> Dict[str, Any]:
         flower_url = "http://127.0.0.1:5555/workers?json=1"
@@ -265,6 +288,7 @@ class CeleryWorker:
             return task_model
     def _prepare_crawl_task(self, data: Dict, select_proxy: Optional[str] = None):
             task_model = CrawlTaskConfig(**data)
+            task_model.browser_config.browser_path = self.config.browser.exe_path
             if select_proxy == 'pool':
                 task_model.proxy_pool_url = f"http://{self.config.backend.host}:{self.config.backend.port}/api/proxy/proxies-pool"
             return task_model                           

+ 1 - 9
ui/backend/tests/mytests/t.py

@@ -20,12 +20,4 @@ class WorkerModel(BaseModel):
 
 # 测试用例
 if __name__ == "__main__":
-    worker1 = WorkerModel(name="worker1", cmd=["python", "app.py"])
-    print(worker1.queue_name)  # 输出: worker1_queue
-
-    worker2 = WorkerModel(
-        name="worker2", 
-        queue_name="custom_queue", 
-        cmd=["python", "app.py"]
-    )
-    print(worker2.queue_name)  # 输出: custom_queue
+    worker = WorkerModel(name="test_worker", cmd=["python", "worker.py"])

+ 26 - 5
ui/fontend/src/components/WorkerCtrl.vue

@@ -18,14 +18,24 @@
             <el-space direction="vertical" alignment="start" :size="15">
             <el-row><span >浏览器搜索任务:{{ taskCounts.search }}</span></el-row>
             <el-row>
-              <el-button type="primary" 
+              <el-button type="primary"
                 :disabled="workerStatus.search || loadingStates.search"
                 :loading="loadingStates.search"
                 @click="sendRequest('search', 'start')">启动窗口</el-button>
-              <el-button type="primary" 
+              <el-button type="primary"
                 :disabled="!workerStatus.search || loadingStates.search"
                 :loading="loadingStates.search"
                 @click="sendRequest('search', 'stop')">停止运行</el-button>
+              <el-button type="primary"
+                :disabled="!workerStatus.search || loadingStates.search"
+                @click="sendRequest('search', workerStatus.search_paused ? 'resume' : 'pause')">
+                {{ workerStatus.search_paused ? '恢复运行' : '暂停运行' }}
+              </el-button>
+              <el-button type="primary"
+                :disabled="!workerStatus.search || loadingStates.search"
+                @click="sendRequest('search', workerStatus.search_paused ? 'resume' : 'pause')">
+                {{ workerStatus.search_paused ? '恢复运行' : '暂停运行' }}
+              </el-button>
 
             </el-row>
             <el-row>
@@ -83,14 +93,18 @@
           <el-card shadow="hover">
             <div class="task-container">
               <span class="task-label">将结果页转换成文档:</span>
-              <el-button type="primary" class="task-button"
+              <el-button type="primary" 
                 :disabled="workerStatus.convert || loadingStates.convert"
                 :loading="loadingStates.convert"
                 @click="sendRequest('convert', 'start')">启动窗口</el-button>
-              <el-button type="primary" class="task-button"
+              <el-button type="primary" 
                 :disabled="!workerStatus.convert || loadingStates.convert"
                 :loading="loadingStates.convert"
                 @click="sendRequest('convert', 'stop')">停止运行</el-button>
+                <el-button type="primary" 
+                :disabled="!workerStatus.convert || loadingStates.convert"
+                :loading="loadingStates.convert"
+                @click="sendRequest('convert', 'submit_all')">执行文档转换</el-button>
             </div>
           </el-card>
         </el-col>
@@ -124,7 +138,10 @@ const workerStatus = computed(() => {
   const status: Record<string, boolean> = {
     search: false,
     crawl: false,
-    convert: false
+    convert: false,
+    search_paused: false,
+    crawl_paused: false,
+    convert_paused: false
   }
 
   workers.value.forEach((worker: any) => {
@@ -134,6 +151,7 @@ const workerStatus = computed(() => {
     const [workerType] = worker.hostname.split('@')  // 从hostname解析类型(search/crawl/convert)
     if (workerType in status) {
       status[workerType] = worker.status
+      status[`${workerType}_paused`] = worker.consumer_status?.paused || false
     }
   })
   // console.log('Worker Status:', workerStatus.value)
@@ -201,6 +219,7 @@ const sendRequest = async (workerName: string, action: string) => {
             }
           }: {}),
           ...(workerName === 'crawl' ? {
+            browser_config: {},
             overwrite: false,
             dry_run: dryRun.value,
             proxy_pool_url: store.selectedProxy
@@ -224,6 +243,8 @@ const sendRequest = async (workerName: string, action: string) => {
     const actionMap: Record<string, string> = {
       start: '启动',
       stop: '停止',
+      pause: '暂停',
+      resume: '恢复',
       submit_all: '提交所有任务',
       clean: '清空任务',
       submit: '提交搜索'

+ 5 - 2
utils/proxy_pool.py

@@ -3,8 +3,11 @@ from config.settings import PROXIES
 import httpx
 def get_proxy_pool(proxy_pool_url) -> list:
     res = httpx.get(proxy_pool_url)
-    return res.json().get("proxies")
-
+    pool_list = res.json().get("proxies")
+    if not pool_list:
+        res = httpx.get(proxy_pool_url + '?force_refresh=true')
+        pool_list = res.json().get("proxies")
+    return pool_list
 def get_random_proxy(proxy_pool_url: str=None) -> str:
     if proxy_pool_url:
         pool = get_proxy_pool(proxy_pool_url)

+ 5 - 1
worker/api/excel_load.py

@@ -6,6 +6,7 @@ import pandas as pd
 from typing import List
 from sqlmodel import select, SQLModel, Session
 from mylib.logu import logger
+from config.settings import GOOGLE_SEARCH_DIR
 from worker.search_engine.search_result_db import SearchResultManager, KeywordTask,db_manager
 from fastapi.responses import JSONResponse
 from io import BytesIO
@@ -96,7 +97,10 @@ def delete_keyword(keyword: str):
     try:
         ret = db_manager.delete_keyword_task(keyword)
         if ret:
-            return {"err":0, "status": "success","model":ret, "message": f"Keyword '{keyword}' deleted successfully"}
+            keyword_dir = Path(GOOGLE_SEARCH_DIR / ret.get('keyword'))
+            if not keyword_dir.exists():
+                keyword_dir = ''
+            return {"err":0, "status": "success","model":ret, "message": f"Keyword '{keyword}' deleted successfully", "keyword_dir": str(keyword_dir)}
         else:
             return {"err":1, "status": "success", "message": f"Keyword '{keyword}' not found"}
     except Exception as e:

+ 34 - 16
worker/celery/crawl_tasks.py

@@ -9,7 +9,16 @@ from mylib.logu import get_logger
 from config.settings import PROXIES
 from crawl4ai import BrowserConfig
 from pydantic import BaseModel
+from mylib.drission_page import load_random_ua_chrome, load_chrome_from_ini
+from utils.proxy_pool import get_random_proxy
+from worker.search_engine.search_result_db import SearchResultItem
 logger = get_logger('crawl_worker')
+class SearchBrowserConfig(BaseModel):
+    # def load_chrome_from_ini(path=CONFIG_DIR / '9321.ini', headless=False, proxy=None, browser_path=None, no_imgs=True):
+    headless: Optional[bool] = False
+    proxy: Optional[str] = None
+    browser_path: Optional[str] = None
+    no_imgs: Optional[bool] = True
 
 class CrawlBrowserConfig(BaseModel):
     headless: bool = True
@@ -18,7 +27,7 @@ class CrawlBrowserConfig(BaseModel):
     proxy: str | None = None
 
 class CrawlTaskConfig(BaseModel):
-    browser_config: Optional[CrawlBrowserConfig] = None
+    browser_config: Optional[SearchBrowserConfig] = None
     overwrite: Optional[bool] = False
     dry_run: Optional[bool] = False
     proxy_pool_url: Optional[str] = None
@@ -31,12 +40,11 @@ class CrawlTaskConfig(BaseModel):
 )
 def crawl_all_unprocessed_pages_task(config: dict|CrawlTaskConfig):
     """异步任务:提交所有未处理的页面URL爬取任务"""
-    from worker.search_engine.search_result_db import SearchResultManager
+    from worker.search_engine.search_result_db import db_manager
     
     config = CrawlTaskConfig(**config)
     try:
-        manager = SearchResultManager()
-        page_ids = manager.get_pages_with_unprocessed_urls()
+        page_ids = db_manager.get_pages_with_unprocessed_urls()
         
         if not page_ids:
             logger.info("没有未处理的页面需要爬取")
@@ -98,24 +106,32 @@ def crawl_page_urls_task(page_id: int, config: dict):
     
     async def _execute_crawl():
         try:
-            # Convert to BrowserConfig
-            browser_config = BrowserConfig(**(crawl_task_config.browser_config or {}))
+            search_browser_config = SearchBrowserConfig(**(crawl_task_config.browser_config.model_dump() or {}))
+            if crawl_task_config.proxy_pool_url:
+                proxy = await asyncio.to_thread(get_random_proxy, crawl_task_config.proxy_pool_url)
+                search_browser_config.proxy = proxy
+                logger.info(f"使用代理池: {crawl_task_config.proxy_pool_url}  -->  {proxy}")
+            else:
+                logger.info(f"使用代理: 跟随系统")
+            page = load_chrome_from_ini(**search_browser_config.model_dump()) if search_browser_config else load_chrome_from_ini()
             
+            crawl_browser_config = BrowserConfig(
+                headless=search_browser_config.headless,
+                use_managed_browser=True,
+                cdp_url=page.browser._driver._websocket_url
+            )
             logger.info(f"{"(测试模式)" if crawl_task_config.dry_run else ""}开始提取搜索结果页: {page_id}")
             crawler = URLCrawler()
-            if crawl_task_config.proxy_pool_url:
-                browser_config.proxy = await crawler.get_random_proxy(crawl_task_config.proxy_pool_url)
-            logger.info(f"BrowserConfig proxy: {browser_config.proxy}")
-            save_dir, res = await crawler.crawl_page_urls(page_id, browser_config, crawl_task_config.overwrite, crawl_task_config.dry_run)
+            save_dir, list_res = await crawler.crawl_page_urls(page_id, crawl_browser_config, crawl_task_config.overwrite, crawl_task_config.dry_run)
             files = []
-            if res:
-                for crawl_ret in res:
+            # logger.info(f"list_res {list_res} ")
+            if list_res:
+                for crawl_ret in list_res:
                     if not isinstance(crawl_ret, dict):
                         continue
-                    search_result_model = crawl_ret.get("search_result_model")
+                    search_result_model:SearchResultItem = crawl_ret.get("search_result_model")
                     if search_result_model:
-                        save_file_path = crawl_ret.get("html_path")
-                        files.append(save_file_path)
+                        files.append(search_result_model.html_path)
             ret = {"page_id": page_id, "status": "completed", "save_dir": save_dir, "files":files}
             logger.info(f"保存目录: {save_dir}")
             logger.info(f"提取到 {len(files)} 个结果: {files}")
@@ -124,7 +140,9 @@ def crawl_page_urls_task(page_id: int, config: dict):
         except Exception as e:
             logger.error(f"URL crawl task failed for page ID {page_id}: {str(e)}")
             raise
-
+        finally:
+            if locals().get("page"):
+                page.quit()
     loop = asyncio.new_event_loop()
     asyncio.set_event_loop(loop)
     try:

+ 7 - 2
worker/celery/html_convert_tasks.py

@@ -1,4 +1,6 @@
+from typing import List, Optional
 from celery import current_app
+from pydantic import BaseModel, Field
 from worker.html_convert.pandoc import process_single_example, process_all_results
 from mylib.logu import get_logger
 from worker.search_engine.search_result_db import SearchResultItem, SearchResultManager
@@ -6,6 +8,9 @@ from sqlmodel import Session, select
 from worker.search_engine.valid_google_search import ValidSearchResult
 
 logger = get_logger('pandoc_tasks')
+class ConvertTaskParams(BaseModel):
+    result_ids: List[str] = Field(..., min_length=1)
+    batch_size: Optional[int] = Field(10, gt=0)
 
 @current_app.task(name='html_convert_tasks.convert_single_result')
 def convert_single_result_task(result_id: int):
@@ -28,7 +33,7 @@ def convert_single_result_task(result_id: int):
         return {"result_id": result_id, "status": "failed"}
 
 @current_app.task(name='html_convert_tasks.convert_all_results')
-def convert_all_results_task():
+def convert_all_results_task(input_params: ConvertTaskParams=None):
     """
     Celery task to convert all SearchResultItems using Pandoc.
     
@@ -37,7 +42,7 @@ def convert_all_results_task():
     """
     try:
         logger.info("Starting Pandoc conversion for all SearchResultItems")
-        process_all_results()
+        test_task_process_all_results()
         logger.info("Pandoc conversion completed for all SearchResultItems")
         return {"status": "completed"}
     except Exception as e:

+ 18 - 10
worker/crawl_pages/crawl_urls.py

@@ -66,18 +66,22 @@ class URLCrawler:
                 extra_args=["--disable-gpu", "--disable-dev-shm-usage", "--no-sandbox"],
             )
         
-        crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
         
         logger.info(f"Crawling id: {item_id} URL: {url}")
-        
+        content_type = ''
         # First try to detect if it's a PDF by checking headers
         async with httpx.AsyncClient(verify=self.ssl_context) as client:
             try:
-                response = await client.head(url)
-                content_type = response.headers.get('content-type', '').lower()
-                logger.info(f"crwal id {item_id} content_type {content_type}")
-                if 'pdf' in content_type:
+                pdf_path = ''
+                if url.endswith('.pdf'):
                     pdf_path = output_dir / f"{item_id}.pdf"
+                else:
+                    response = await client.head(url)
+                    content_type = response.headers.get('content-type', '').lower()
+                    if 'pdf' in content_type:
+                        pdf_path = output_dir / f"{item_id}.pdf"
+                logger.info(f"crwal id {item_id} content_type {content_type} {pdf_path}")
+                if pdf_path:
                     if await self.download_pdf(url, pdf_path):
                         # Update database with PDF path
                         with Session(self.db_manager.engine) as session:
@@ -90,16 +94,18 @@ class URLCrawler:
                                 session.add(item)
                                 session.commit()
                         return {"search_result_model": item, "crawl_result": None, 'message': response.headers.get('content-type')}
-                elif 'text/csv' in content_type:
-                    return {"search_result_model": item, "crawl_result": None,'message': f'skip {content_type} id {item_id}'}
             except Exception as e:
                 logger.warning(f"Failed to check headers for id: {item_id} , {url} {str(e)}")
-            
+                return {"search_result_model": None, "crawl_result": None, 'message': str(e)}
+        if 'html' not in content_type:
+            return {"search_result_model": None, "crawl_result": None,'message': f'not html, content_type {content_type}'}
+        logger.info(f"crawler.arun start {item_id} content-type: {content_type}, {url} ")    
         # If not PDF or header check failed, try regular crawl
         crawler = AsyncWebCrawler(config=browser_config)
         await crawler.start()
         
         try:
+            crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
             result:CrawlResult = await crawler.arun(url=url, config=crawl_config)
             logger.info(f"{item_id} crawler.arun result.success: {result.success} {result.status_code}")
             # If crawl failed but URL contains 'download', try PDF again
@@ -151,7 +157,9 @@ class URLCrawler:
                         session.refresh(item)
                 
             return {"search_result_model": item, "crawl_result": result}
-            
+        except Exception as e:
+            logger.error(f"Failed to crawl id: {item_id} , {url} {str(e)}")
+            return {"search_result_model": item, "crawl_result": None, 'message': str(e)}    
         finally:
             await crawler.close()
 

+ 3 - 3
worker/html_convert/pandoc.py

@@ -8,7 +8,7 @@ import ssl
 import os
 from sqlmodel import select, Session
 from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult
-from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchPageResult, SearchResultItem
+from worker.search_engine.search_result_db import SearchResultManager, db_manager, SearchPageResult, SearchResultItem
 from mylib.base import ensure_output_dir, save_to_file, load_from_pickle
 from mylib.logu import get_logger
 from config.settings import PANDOC_EXE, HTTP_PROXY, HTTPS_PROXY
@@ -194,7 +194,7 @@ def process_single_example(result_id: int, skip_existing=True):
     return success
 def process_all_results():
     # Process all results in the database
-    db_manager = SearchResultManager()
+    global db_manager
     with Session(db_manager.engine) as session:
         # Fetch all IDs with explicit ordering
         result_ids = session.exec(select(SearchResultItem.id, SearchResultItem.html_path).order_by(SearchResultItem.id)).all()
@@ -203,7 +203,7 @@ def process_all_results():
         
         for result_id, html_path in result_ids:
             try:
-                if html_path.endswith('.html'):
+                if html_path and html_path.endswith('.html'):
                     process_single_example(result_id)
             except Exception as e:
                 logger.error(f"Error processing result {result_id}: {e}")

Certains fichiers n'ont pas été affichés car il y a eu trop de fichiers modifiés dans ce diff