Browse Source

优化 crawl4ai 的 PDF 下载和保存逻辑。新增数据库字段 content_type 修改 html_path 改为 save_path 支持保存为 pdf 或 html ,新增 markdown 字段

mrh 9 months ago
parent
commit
cee5551228

+ 2 - 2
config/settings.py

@@ -5,14 +5,14 @@ import subprocess
 WORK_DIR = Path(__file__).parent.parent.absolute()
 OUTPUT_DIR = WORK_DIR / "output"
 CONFIG_DIR = WORK_DIR / "config" / "conf"
-GOOGLE_SEARCH_DIR = OUTPUT_DIR / 'results'
+GOOGLE_SEARCH_DIR = OUTPUT_DIR / 'results1'
 PANDOC_EXE = os.environ.get('PANDOC_EXE') or shutil.which('pandoc')
 
 LOG_LEVEL='info'
 LOG_DIR = OUTPUT_DIR / "logs"
 
 # DB_URL = f"sqlite:///{OUTPUT_DIR}/search_results.db"
-DB_URL = os.environ.get('DB_URL') or f"sqlite:///{OUTPUT_DIR}/temp.db"
+DB_URL = os.environ.get('DB_URL') or f"sqlite:///{OUTPUT_DIR}/temp1.db"
 
 HTTP_PROXY='http://127.0.0.1:1881'
 HTTPS_PROXY='http://127.0.0.1:1881'

+ 3 - 2
tests/mytest/crawl_t.py

@@ -40,6 +40,7 @@ async def task():
     item_id = 1
     # url = 'https://greg.app/acalypha-marissima-overview/'
     url = 'https://fr.florame.com/en/essential-oils'
+    # url = 'https://repository.arizona.edu/bitstream/10150/550946/1/dp_04_01-04.pdf'
     # url = 'https://baidu.com'
     browser_config = BrowserConfig(
         headless=False,
@@ -59,9 +60,9 @@ async def task():
 
     crawler = AsyncWebCrawler(config=browser_config)
     await crawler.start()
-    crawl_config = CrawlerRunConfig(cache_mode=CacheMode.DISABLED)
+    crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
     result:CrawlResult = await crawler.arun(url=url, config=crawl_config)
-    logger.info(f"{item_id} crawler.arun result.success: {result.success} {result.status_code}")
+    logger.info(f"{item_id} crawler.arun result.success: {result.model_dump_json(indent=2)} ")
     print(result.markdown)
     input('press enter to continue')
     await crawler.close()

+ 1 - 1
ui/backend/config.yaml

@@ -12,7 +12,7 @@ redis:
   host: localhost
   port: 6379
 select_proxy: system
-sqluri: sqlite:///G:\code\upwork\zhang_crawl_bio\output\temp.db
+sqluri: sqlite:///G:\code\upwork\zhang_crawl_bio\output\temp1.db
 sub:
   auto_start: true
   file: g:\code\upwork\zhang_crawl_bio\download\proxy_pool\6137e542.yaml

+ 2 - 1
worker/celery/crawl_tasks.py

@@ -118,6 +118,8 @@ def crawl_page_urls_task(page_id: int, config: dict):
     if sys.platform == 'win32':
         asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
     
+    logger.info(f"{"(测试模式)" if crawl_task_config.dry_run else ""}开始提取搜索结果页: {page_id}")
+    logger.info(f"config {config}")
     async def _execute_crawl():
         try:
             search_browser_config = SearchBrowserConfig(**(crawl_task_config.browser_config.model_dump() or {}))
@@ -134,7 +136,6 @@ def crawl_page_urls_task(page_id: int, config: dict):
                 use_managed_browser=True,
                 cdp_url=page.browser._driver._websocket_url
             )
-            logger.info(f"{"(测试模式)" if crawl_task_config.dry_run else ""}开始提取搜索结果页: {page_id}")
             crawler = URLCrawler()
             save_dir, list_res = await crawler.crawl_page_urls(page_id, crawl_browser_config, crawl_task_config.overwrite, crawl_task_config.dry_run)
             files = []

+ 10 - 3
worker/celery/html_convert_tasks.py

@@ -6,6 +6,7 @@ from mylib.logu import get_logger
 from worker.search_engine.search_result_db import SearchResultItem, SearchResultManager
 from sqlmodel import Session, select
 from worker.search_engine.valid_google_search import ValidSearchResult
+from worker.html_convert.crawl_filter import CrawlFilter
 
 logger = get_logger('pandoc_tasks')
 class ConvertTaskParams(BaseModel):
@@ -73,13 +74,14 @@ def test_task_process_all_results():
         valid_items = valid_search.get_valid_search_result_items()
         
         logger.info(f"找到 {len(valid_items)} 个有效结果,开始批量提交...")
-        
+        for item in valid_items[:5]:
+            logger.info(f"开始提交转换任务,结果ID: {item}")
         # 创建任务参数
         params = ConvertTaskParams(
             result_ids=[str(item.id) for item in valid_items],
             queue_name='convert_queue'
         )
-        
+        return
         # 调用转换任务
         result = convert_all_results_task(params)
         logger.info(f"批量提交完成,任务ID: {result.get('task_ids', [])}")
@@ -98,7 +100,12 @@ def clear_existing_tasks():
 
 def main():
     # test_task_process_all_results()
-    clear_existing_tasks()
+    c = CrawlFilter()
+    res = c.db_manager.get_complete_search_result_items()
+    for item in res[:5]:
+        logger.info(f"{item}")
+    logger.info(f"{len(res)}")
+    # clear_existing_tasks()
     pass
 
 if __name__ == "__main__":

+ 44 - 55
worker/crawl_pages/crawl_urls.py

@@ -2,9 +2,10 @@ import asyncio
 import pickle
 from pathlib import Path
 import random
-from typing import List
+from typing import List,Optional
 import httpx
 import ssl
+from pydantic import BaseModel, Field
 from sqlmodel import select, Session
 from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, CrawlResult
 from worker.search_engine.search_result_db import SearchResultManager, KeywordTask, SearchPageResult, SearchResultItem
@@ -12,6 +13,12 @@ from mylib.base import ensure_output_dir, save_to_file,load_from_pickle
 from mylib.logu import logger
 from utils.proxy_pool import get_random_proxy
 
+class CrawlerResult(BaseModel):
+    err: Optional[int] = 1
+    search_result_model: Optional[SearchResultItem] = None
+    crawl_result: Optional[CrawlResult] = None
+    message: Optional[str] = None
+
 class URLCrawler:
     def __init__(self, max_concurrent: int = 3):
         self.max_concurrent = max_concurrent
@@ -49,15 +56,16 @@ class URLCrawler:
     async def crawl_url(self, url: str, item_id: int, output_dir: Path, browser_config: BrowserConfig = None, overwrite: bool = False) -> CrawlResult:
         """Crawl a single URL and save results with item_id as filename"""
         # Check if we should skip this URL
+        item = None
         with Session(self.db_manager.engine) as session:
             item = session.exec(
                 select(SearchResultItem)
                 .where(SearchResultItem.id == item_id)
             ).first()
             
-            if item and item.html_path and not overwrite:
-                logger.info(f"Skipping {url} (item_id: {item_id}) - already has html_path: {item.html_path}")
-                return {"search_result_model": item, "crawl_result": None, 'message': 'already has html_path'}
+            if item and item.save_path and not overwrite:
+                logger.info(f"Skipping {url} (item_id: {item_id}) - already has save_path: {item.save_path}")
+                return {"search_result_model": item, "crawl_result": None, 'message': 'already has save_path'}
                 
         if not browser_config:
             browser_config = BrowserConfig(
@@ -80,26 +88,30 @@ class URLCrawler:
                     content_type = response.headers.get('content-type', '').lower()
                     if 'pdf' in content_type:
                         pdf_path = output_dir / f"{item_id}.pdf"
-                logger.info(f"crwal id {item_id} content_type {content_type} {pdf_path}")
-                if pdf_path:
-                    if await self.download_pdf(url, pdf_path):
-                        # Update database with PDF path
-                        with Session(self.db_manager.engine) as session:
-                            item = session.exec(
-                                select(SearchResultItem)
-                                .where(SearchResultItem.id == item_id)
-                            ).first()
-                            if item:
-                                item.html_path = str(pdf_path)
-                                session.add(item)
-                                session.commit()
-                        return {"search_result_model": item, "crawl_result": None, 'message': response.headers.get('content-type')}
             except Exception as e:
                 logger.warning(f"Failed to check headers for id: {item_id} , {url} {str(e)}")
-        #         return {"search_result_model": None, "crawl_result": None, 'message': str(e)}
-        # if 'html' not in content_type:
-        #     logger.info(f"Skipping {url} (item_id: {item_id}) - not html, conent_type {content_type}")
-        #     return {"search_result_model": None, "crawl_result": None,'message': f'not html, content_type {content_type}'}
+        logger.info(f"crwal id {item_id} content_type {content_type} {pdf_path}")
+        search_result_model = item or SearchResultItem(id=item_id)
+        search_result_model.content_type = content_type
+        if pdf_path:
+            try:
+                if await self.download_pdf(url, pdf_path):
+                    search_result_model.save_path = str(pdf_path)
+                else:
+                    logger.warning(f"Failed to download PDF for id: {item_id}, {url}")
+                    return CrawlerResult(err=1, message='failed to download pdf', search_result_model=None, crawl_result=None)
+                self.db_manager.add_or_update_search_result_item(search_result_model)
+                logger.info(f"{item_id} download_pdf success {pdf_path}")
+                # PDF必须要返回了,因为 crawl4ai 如果是文件类型,
+                # 它会默认下载到路径,反而得不到自动下载的 PDF 文件,除非能额外监听它是否下载成功
+                return CrawlerResult(err=0, message='success', search_result_model=search_result_model, crawl_result=None)
+            except Exception as e:
+                logger.warning(f"Failed to download PDF for id: {item_id}, {url} {str(e)}")
+                return CrawlerResult(err=1, message=str(e), search_result_model=None, crawl_result=None)
+
+        if 'html' not in content_type:
+            logger.info(f"Skipping {url} (item_id: {item_id}) - not html, conent_type {content_type}")
+            return CrawlerResult(err=2, message='not html', search_result_model=search_result_model, crawl_result=None)
         logger.info(f"crawler.arun start {item_id} content-type: {content_type}, {url} ")    
         logger.info(f"browser_config use_managed_browser {browser_config.use_managed_browser} , cdp_url: {browser_config.cdp_url}, headless: {browser_config.headless}")
         # If not PDF or header check failed, try regular crawl
@@ -110,22 +122,7 @@ class URLCrawler:
             crawl_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
             result:CrawlResult = await crawler.arun(url=url, config=crawl_config)
             logger.info(f"{item_id} crawler.arun result.success: {result.success} {result.status_code}")
-            # If crawl failed but URL contains 'download', try PDF again
-            if not result.success and 'download' in url.lower():
-                pdf_path = output_dir / f"{item_id}.pdf"
-                if await self.download_pdf(url, pdf_path):
-                    # Update database with PDF path
-                    with Session(self.db_manager.engine) as session:
-                        item = session.exec(
-                            select(SearchResultItem)
-                            .where(SearchResultItem.id == item_id)
-                        ).first()
-                        if item:
-                            item.html_path = str(pdf_path)
-                            session.add(item)
-                            session.commit()
-                            session.refresh(item)
-                    return {"search_result_model": item, "crawl_result": result}
+            
                 
             # Save results
             ensure_output_dir(output_dir)
@@ -136,32 +133,24 @@ class URLCrawler:
                 pickle.dump(result, f)
                 
             # Save HTML and Markdown if available
-            html_path = None
+            save_path = None
             if result.html:
-                html_path = output_dir / f"{item_id}.html"
-                save_to_file(result.html, html_path)
+                save_path = output_dir / f"{item_id}.html"
+                save_to_file(result.html, save_path)
+                search_result_model.save_path = str(save_path)
                 
             if result.markdown:
                 md_path = output_dir / f"{item_id}.md"
                 save_to_file(result.markdown, md_path)
-                
+                search_result_model.markdown_path = str(md_path)
             # Update database with HTML path
-            if html_path:
-                with Session(self.db_manager.engine) as session:
-                    item = session.exec(
-                        select(SearchResultItem)
-                        .where(SearchResultItem.id == item_id)
-                    ).first()
-                    if item:
-                        item.html_path = str(html_path)
-                        session.add(item)
-                        session.commit()
-                        session.refresh(item)
+            if save_path:
+                self.db_manager.add_or_update_search_result_item(search_result_model)
             logger.info(f"{item_id} crawler.arun result.success: {item}")
-            return {"search_result_model": item, "crawl_result": result}
+            return CrawlerResult(err=0, message='success', search_result_model=search_result_model, crawl_result=result)
         except Exception as e:
             logger.error(f"Failed to crawl id: {item_id} , {url} {str(e)}")
-            return {"search_result_model": item, "crawl_result": None, 'message': str(e)}    
+            return CrawlerResult(err=1, message=str(e), search_result_model=search_result_model, crawl_result=result)
         finally:
             await crawler.close()
 

+ 0 - 1
worker/html_convert/converter_base.py

@@ -15,7 +15,6 @@ class ConverterBase:
     
     def __init__(self):
         self.db_manager = SearchResultManager()
-
     def get_search_result_item(self, result_id: int) -> Optional[SearchResultItem]:
         """Get the search result item by ID"""
         with Session(self.db_manager.engine) as session:

+ 18 - 4
worker/search_engine/search_result_db.py

@@ -42,7 +42,9 @@ class SearchResultItem(SQLModel, table=True):
     url: str
     title: Optional[str] = None
     content: Optional[str] = None
-    html_path: Optional[str] = None
+    content_type: Optional[str] = None
+    save_path: Optional[str] = None
+    markdown_path: Optional[str] = None
     keyword_id: int = Field(foreign_key="keywordtask.id")
     keyword: str = Field(index=True)
     page_id: int = Field(foreign_key="searchpageresult.id")
@@ -185,7 +187,7 @@ class SearchResultManager:
                         url=item.url,
                         title=item.title,
                         content=item.content,
-                        html_path=str(html_path) if html_path else None,
+                        save_path=str(html_path) if html_path else None,
                         keyword_id=keyword_task.id,
                         keyword=keyword,
                         page_id=page_id
@@ -233,7 +235,7 @@ class SearchResultManager:
             query = (
                 select(distinct(SearchPageResult.id))
                 .join(SearchResultItem, SearchPageResult.id == SearchResultItem.page_id)
-                .where(SearchResultItem.html_path.is_(None))
+                .where(SearchResultItem.save_path.is_(None))
             )
             page_ids = session.exec(query).all()
             return page_ids
@@ -268,5 +270,17 @@ class SearchResultManager:
                 session.refresh(verification_item)
                 return verification_item
             return exists
-        
+    def get_complete_search_result_items(self) -> list[SearchResultItem]:
+        """Get all successful search result items"""
+        with Session(self.engine) as session:
+            return session.exec(
+                select(SearchResultItem)
+                .where(SearchResultItem.save_path.is_not(None))
+            ).all()
+    def add_or_update_search_result_item(self, search_result_item: SearchResultItem):
+        with Session(self.engine) as session:
+            session.add(search_result_item)
+            session.commit()
+            session.refresh(search_result_item)
+            return search_result_item
 db_manager = SearchResultManager()